diff --git a/chain/deals/handler.go b/chain/deals/handler.go index a7ff46fe1..94f64a96b 100644 --- a/chain/deals/handler.go +++ b/chain/deals/handler.go @@ -6,8 +6,8 @@ import ( "github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/chain/address" - "github.com/filecoin-project/go-lotus/lib/sectorbuilder" "github.com/filecoin-project/go-lotus/node/modules/dtypes" + "github.com/filecoin-project/go-lotus/storage/sector" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" @@ -35,8 +35,8 @@ type MinerDeal struct { } type Handler struct { - sb *sectorbuilder.SectorBuilder - full api.FullNode + secst *sector.Store + full api.FullNode // TODO: Use a custom protocol or graphsync in the future // TODO: GC @@ -60,7 +60,7 @@ type dealUpdate struct { mut func(*MinerDeal) } -func NewHandler(ds dtypes.MetadataDS, sb *sectorbuilder.SectorBuilder, dag dtypes.StagingDAG, fullNode api.FullNode) (*Handler, error) { +func NewHandler(ds dtypes.MetadataDS, secst *sector.Store, dag dtypes.StagingDAG, fullNode api.FullNode) (*Handler, error) { addr, err := ds.Get(datastore.NewKey("miner-address")) if err != nil { return nil, err @@ -71,9 +71,9 @@ func NewHandler(ds dtypes.MetadataDS, sb *sectorbuilder.SectorBuilder, dag dtype } return &Handler{ - sb: sb, - dag: dag, - full: fullNode, + secst: secst, + dag: dag, + full: fullNode, conns: map[cid.Cid]inet.Stream{}, diff --git a/chain/deals/handler_states.go b/chain/deals/handler_states.go index 6c76ab969..486af2c83 100644 --- a/chain/deals/handler_states.go +++ b/chain/deals/handler_states.go @@ -2,8 +2,6 @@ package deals import ( "context" - "time" - "github.com/filecoin-project/go-lotus/lib/sectorbuilder" files "github.com/ipfs/go-ipfs-files" @@ -88,11 +86,7 @@ func (h *Handler) staged(ctx context.Context, deal MinerDeal) (func(*MinerDeal), return nil, xerrors.Errorf("failed to get file size: %s", err) } - var sectorID uint64 - err = withTemp(uf, func(f string) (err error) { - sectorID, err = h.sb.AddPiece(deal.Proposal.PieceRef, uint64(size), f) - return err - }) + sectorID, err := h.secst.AddPiece(deal.Proposal.PieceRef, uint64(size), uf, deal.Proposal.Duration) if err != nil { return nil, xerrors.Errorf("AddPiece failed: %s", err) } @@ -117,37 +111,29 @@ func getInclusionProof(ref string, status sectorbuilder.SectorSealingStatus) (Pi return PieceInclusionProof{}, xerrors.Errorf("pieceInclusionProof for %s in sector %d not found", ref, status.SectorID) } -func (h *Handler) pollSectorSealed(deal MinerDeal) (status sectorbuilder.SectorSealingStatus, err error) { -loop: - for { - status, err = h.sb.SealStatus(deal.SectorID) - if err != nil { - return sectorbuilder.SectorSealingStatus{}, err - } - - switch status.SealStatusCode { - case 0: // sealed - break loop - case 2: // failed - return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("sealing sector %d for deal %s (ref=%s) failed: %s", deal.SectorID, deal.ProposalCid, deal.Ref, status.SealErrorMsg) - case 1: // pending - if err := h.sb.SealAllStagedSectors(); err != nil { - return sectorbuilder.SectorSealingStatus{}, err - } - // start seal - fallthrough - case 3: // sealing - // wait - default: - return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("unknown SealStatusCode: %d", status.SectorID) - } - time.Sleep(3 * time.Second) +func (h *Handler) waitSealed(deal MinerDeal) (sectorbuilder.SectorSealingStatus, error) { + status, err := h.secst.WaitSeal(context.TODO(), deal.SectorID) + if err != nil { + return sectorbuilder.SectorSealingStatus{}, err } + + switch status.SealStatusCode { + case 0: // sealed + case 2: // failed + return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("sealing sector %d for deal %s (ref=%s) failed: %s", deal.SectorID, deal.ProposalCid, deal.Ref, status.SealErrorMsg) + case 1: // pending + return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("sector status was 'pending' after call to WaitSeal (for sector %d)", deal.SectorID) + case 3: // sealing + return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("sector status was 'wait' after call to WaitSeal (for sector %d)", deal.SectorID) + default: + return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("unknown SealStatusCode: %d", status.SectorID) + } + return status, nil } func (h *Handler) sealing(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) { - status, err := h.pollSectorSealed(deal) + status, err := h.waitSealed(deal) if err != nil { return nil, err } diff --git a/lib/sectorbuilder/poll.go b/lib/sectorbuilder/poll.go deleted file mode 100644 index edf425f37..000000000 --- a/lib/sectorbuilder/poll.go +++ /dev/null @@ -1,56 +0,0 @@ -package sectorbuilder - -import ( - "context" - "time" -) - -// TODO: really need to get a callbacks API from the rust-sectorbuilder -func (sb *SectorBuilder) pollForSealedSectors(ctx context.Context) { - watching := make(map[uint64]bool) - - staged, err := sb.GetAllStagedSectors() - if err != nil { - // TODO: this is probably worth shutting the miner down over until we - // have better recovery mechanisms - log.Errorf("failed to get staged sectors: %s", err) - } - for _, s := range staged { - watching[s.SectorID] = true - } - - tick := time.Tick(time.Second * 5) - for { - select { - case <-tick: - log.Info("polling for sealed sectors...") - - // add new staged sectors to watch list - staged, err := sb.GetAllStagedSectors() - if err != nil { - log.Errorf("in loop: failed to get staged sectors: %s", err) - continue - } - - for _, s := range staged { - watching[s.SectorID] = true - } - - for s := range watching { - status, err := sb.SealStatus(s) - if err != nil { - log.Errorf("getting seal status: %s", err) - continue - } - - if status.SealStatusCode == 0 { // constant pls, zero implies the last step? - delete(watching, s) - sb.sschan <- status - } - } - case <-ctx.Done(): - close(sb.sschan) - return - } - } -} diff --git a/lib/sectorbuilder/sectorbuilder.go b/lib/sectorbuilder/sectorbuilder.go index 28fd72d22..8ad6d61f4 100644 --- a/lib/sectorbuilder/sectorbuilder.go +++ b/lib/sectorbuilder/sectorbuilder.go @@ -1,7 +1,6 @@ package sectorbuilder import ( - "context" "encoding/binary" "unsafe" @@ -22,8 +21,6 @@ const CommLen = sectorbuilder.CommitmentBytesLen type SectorBuilder struct { handle unsafe.Pointer - - sschan chan SectorSealingStatus } type SectorBuilderConfig struct { @@ -44,7 +41,6 @@ func New(cfg *SectorBuilderConfig) (*SectorBuilder, error) { return &SectorBuilder{ handle: sbp, - sschan: make(chan SectorSealingStatus, 32), }, nil } @@ -60,10 +56,6 @@ func sectorIDtoBytes(sid uint64) [31]byte { return out } -func (sb *SectorBuilder) Run(ctx context.Context) { - go sb.pollForSealedSectors(ctx) -} - func (sb *SectorBuilder) Destroy() { sectorbuilder.DestroySectorBuilder(sb.handle) } @@ -95,11 +87,6 @@ func (sb *SectorBuilder) GeneratePoSt(sortedCommRs [][CommLen]byte, challengeSee return sectorbuilder.GeneratePoSt(sb.handle, sortedCommRs, challengeSeed) } -func (sb *SectorBuilder) SealedSectorChan() <-chan SectorSealingStatus { - // is this ever going to be multi-consumer? If so, switch to using pubsub/eventbus - return sb.sschan -} - var UserBytesForSectorSize = sectorbuilder.GetMaxUserBytesPerStagedSector func VerifySeal(sectorSize uint64, commR, commD, commRStar []byte, proverID address.Address, sectorID uint64, proof []byte) (bool, error) { diff --git a/lib/sectorbuilder/sectorbuilder_test.go b/lib/sectorbuilder/sectorbuilder_test.go index 211bbcbbc..ebe28cfac 100644 --- a/lib/sectorbuilder/sectorbuilder_test.go +++ b/lib/sectorbuilder/sectorbuilder_test.go @@ -1,14 +1,14 @@ -package sectorbuilder +package sectorbuilder_test import ( - "context" - "fmt" "io" "io/ioutil" "math/rand" "testing" "github.com/filecoin-project/go-lotus/chain/address" + "github.com/filecoin-project/go-lotus/lib/sectorbuilder" + "github.com/filecoin-project/go-lotus/storage/sector" ) func TestSealAndVerify(t *testing.T) { @@ -23,7 +23,7 @@ func TestSealAndVerify(t *testing.T) { t.Fatal(err) } - sb, err := New(&SectorBuilderConfig{ + sb, err := sectorbuilder.New(§orbuilder.SectorBuilderConfig{ SectorSize: 1024, SealedDir: dir, StagedDir: dir, @@ -34,11 +34,6 @@ func TestSealAndVerify(t *testing.T) { t.Fatal(err) } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - sb.Run(ctx) - fi, err := ioutil.TempFile("", "sbtestfi") if err != nil { t.Fatal(err) @@ -51,10 +46,11 @@ func TestSealAndVerify(t *testing.T) { t.Fatal(err) } - ssinfo := <-sb.SealedSectorChan() - fmt.Println("sector sealed...") + store := sector.NewStore(sb) + store.Service() + ssinfo := <-store.Incoming() - ok, err := VerifySeal(1024, ssinfo.CommR[:], ssinfo.CommD[:], ssinfo.CommRStar[:], addr, ssinfo.SectorID, ssinfo.Proof) + ok, err := sectorbuilder.VerifySeal(1024, ssinfo.CommR[:], ssinfo.CommD[:], ssinfo.CommRStar[:], addr, ssinfo.SectorID, ssinfo.Proof) if err != nil { t.Fatal(err) } diff --git a/node/builder.go b/node/builder.go index d541f8d2d..ff197b425 100644 --- a/node/builder.go +++ b/node/builder.go @@ -35,6 +35,7 @@ import ( "github.com/filecoin-project/go-lotus/node/repo" "github.com/filecoin-project/go-lotus/paych" "github.com/filecoin-project/go-lotus/storage" + "github.com/filecoin-project/go-lotus/storage/sector" ) // special is a type used to give keys to modules which @@ -74,7 +75,10 @@ const ( HandleIncomingMessagesKey RunDealClientKey + + // storage miner HandleDealsKey + RunSectorServiceKey // daemon ExtractApiKey @@ -222,13 +226,15 @@ func Online() Option { // Storage miner ApplyIf(func(s *Settings) bool { return s.nodeType == nodeStorageMiner }, - Override(new(*sectorbuilder.SectorBuilder), modules.SectorBuilder), + Override(new(*sectorbuilder.SectorBuilder), sectorbuilder.New), + Override(new(*sector.Store), sector.NewStore), Override(new(*storage.Miner), modules.StorageMiner), Override(new(dtypes.StagingDAG), modules.StagingDAG), Override(new(*deals.Handler), deals.NewHandler), Override(HandleDealsKey, modules.HandleDeals), + Override(RunSectorServiceKey, modules.RunSectorService), ), ) } diff --git a/node/impl/storminer.go b/node/impl/storminer.go index a7ca3bbd0..2c3b6e552 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -3,13 +3,14 @@ package impl import ( "context" "fmt" - "github.com/filecoin-project/go-lotus/chain/address" - "io/ioutil" + "io" "math/rand" "github.com/filecoin-project/go-lotus/api" + "github.com/filecoin-project/go-lotus/chain/address" "github.com/filecoin-project/go-lotus/lib/sectorbuilder" "github.com/filecoin-project/go-lotus/storage" + "github.com/filecoin-project/go-lotus/storage/sector" ) type StorageMinerAPI struct { @@ -17,6 +18,7 @@ type StorageMinerAPI struct { SectorBuilderConfig *sectorbuilder.SectorBuilderConfig SectorBuilder *sectorbuilder.SectorBuilder + Sectors *sector.Store Miner *storage.Miner } @@ -26,20 +28,10 @@ func (sm *StorageMinerAPI) ActorAddresses(context.Context) ([]address.Address, e } func (sm *StorageMinerAPI) StoreGarbageData(ctx context.Context) (uint64, error) { - maxSize := uint64(1016) // this is the most data we can fit in a 1024 byte sector - data := make([]byte, maxSize) - fi, err := ioutil.TempFile("", "lotus-garbage") - if err != nil { - return 0, err - } - - if _, err := fi.Write(data); err != nil { - return 0, err - } - fi.Close() + size := uint64(1016) // this is the most data we can fit in a 1024 byte sector name := fmt.Sprintf("fake-file-%d", rand.Intn(100000000)) - sectorId, err := sm.SectorBuilder.AddPiece(name, maxSize, fi.Name()) + sectorId, err := sm.Sectors.AddPiece(name, size, io.LimitReader(rand.New(rand.NewSource(42)), 1016), 0) if err != nil { return 0, err } diff --git a/node/modules/services.go b/node/modules/services.go index 653ab91a7..babee96d9 100644 --- a/node/modules/services.go +++ b/node/modules/services.go @@ -2,6 +2,7 @@ package modules import ( "context" + "github.com/filecoin-project/go-lotus/storage/sector" "github.com/libp2p/go-libp2p-core/host" inet "github.com/libp2p/go-libp2p-core/network" @@ -69,3 +70,16 @@ func RunDealClient(lc fx.Lifecycle, c *deals.Client) { }, }) } + +func RunSectorService(lc fx.Lifecycle, secst *sector.Store) { + lc.Append(fx.Hook{ + OnStart: func(context.Context) error { + secst.Service() + return nil + }, + OnStop: func(context.Context) error { + secst.Stop() + return nil + }, + }) +} diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 0cd005c2a..df068c99c 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -2,6 +2,7 @@ package modules import ( "context" + "github.com/filecoin-project/go-lotus/storage/sector" "path/filepath" "github.com/ipfs/go-bitswap" @@ -63,31 +64,13 @@ func SectorBuilderConfig(storagePath string) func(dtypes.MetadataDS) (*sectorbui } } -func SectorBuilder(mctx helpers.MetricsCtx, lc fx.Lifecycle, sbc *sectorbuilder.SectorBuilderConfig) (*sectorbuilder.SectorBuilder, error) { - sb, err := sectorbuilder.New(sbc) - if err != nil { - return nil, err - } - - ctx := helpers.LifecycleCtx(mctx, lc) - - lc.Append(fx.Hook{ - OnStart: func(context.Context) error { - sb.Run(ctx) - return nil - }, - }) - - return sb, nil -} - -func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h host.Host, ds dtypes.MetadataDS, sb *sectorbuilder.SectorBuilder) (*storage.Miner, error) { +func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h host.Host, ds dtypes.MetadataDS, secst *sector.Store) (*storage.Miner, error) { maddr, err := minerAddrFromDS(ds) if err != nil { return nil, err } - sm, err := storage.NewMiner(api, maddr, h, ds, sb) + sm, err := storage.NewMiner(api, maddr, h, ds, secst) if err != nil { return nil, err } diff --git a/storage/miner.go b/storage/miner.go index 93b1993d9..81e1a35b9 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -3,6 +3,7 @@ package storage import ( "context" "fmt" + "github.com/filecoin-project/go-lotus/storage/sector" "github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/chain/actors" @@ -23,7 +24,7 @@ var log = logging.Logger("storageminer") type Miner struct { api storageMinerApi - sb *sectorbuilder.SectorBuilder + secst *sector.Store maddr address.Address @@ -52,13 +53,13 @@ type storageMinerApi interface { WalletHas(context.Context, address.Address) (bool, error) } -func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datastore.Batching, sb *sectorbuilder.SectorBuilder) (*Miner, error) { +func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datastore.Batching, secst *sector.Store) (*Miner, error) { return &Miner{ api: api, maddr: addr, h: h, ds: ds, - sb: sb, + secst: secst, }, nil } @@ -73,9 +74,12 @@ func (m *Miner) Run(ctx context.Context) error { } func (m *Miner) handlePostingSealedSectors(ctx context.Context) { + incoming := m.secst.Incoming() + defer m.secst.CloseIncoming(incoming) + for { select { - case sinfo, ok := <-m.sb.SealedSectorChan(): + case sinfo, ok := <-incoming: if !ok { // TODO: set some state variable so that this state can be // visible via some status command diff --git a/storage/sector/store.go b/storage/sector/store.go new file mode 100644 index 000000000..2f366cbe3 --- /dev/null +++ b/storage/sector/store.go @@ -0,0 +1,190 @@ +package sector + +import ( + "context" + "io" + "io/ioutil" + "os" + "sync" + "time" + + "github.com/filecoin-project/go-lotus/lib/sectorbuilder" + + logging "github.com/ipfs/go-log" +) + +var log = logging.Logger("sectorstore") + +// TODO: eventually handle sector storage here instead of in rust-sectorbuilder +type Store struct { + lk sync.Mutex + sb *sectorbuilder.SectorBuilder + + waiting map[uint64]chan struct{} + incoming []chan sectorbuilder.SectorSealingStatus + // TODO: outdated chan + + closeCh chan struct{} +} + +func NewStore(sb *sectorbuilder.SectorBuilder) *Store { + return &Store{ + sb: sb, + waiting: map[uint64]chan struct{}{}, + closeCh: make(chan struct{}), + } +} + +func (s *Store) Service() { + go s.service() +} + +func (s *Store) poll() { + log.Info("polling for sealed sectors...") + + // get a list of sectors to poll + s.lk.Lock() + toPoll := make([]uint64, 0, len(s.waiting)) + + for id := range s.waiting { + toPoll = append(toPoll, id) + } + s.lk.Unlock() + + var done []sectorbuilder.SectorSealingStatus + + // check status of each + for _, sec := range toPoll { + status, err := s.sb.SealStatus(sec) + if err != nil { + log.Errorf("getting seal status: %s", err) + continue + } + + if status.SealStatusCode == 0 { // constant pls, zero implies the last step? + done = append(done, status) + } + } + + // send updates + s.lk.Lock() + for _, sector := range done { + watch, ok := s.waiting[sector.SectorID] + if ok { + close(watch) + delete(s.waiting, sector.SectorID) + } + for _, c := range s.incoming { + c <- sector // TODO: ctx! + } + } + s.lk.Unlock() +} + +func (s *Store) service() { + poll := time.Tick(5 * time.Second) + + for { + select { + case <-poll: + s.poll() + case <-s.closeCh: + s.lk.Lock() + for _, c := range s.incoming { + close(c) + } + s.lk.Unlock() + return + } + } +} + +func (s *Store) AddPiece(ref string, size uint64, r io.Reader, keepAtLeast uint64) (sectorID uint64, err error) { + err = withTemp(r, func(f string) (err error) { + sectorID, err = s.sb.AddPiece(ref, size, f) + return err + }) + if err != nil { + return 0, err + } + + s.lk.Lock() + _, exists := s.waiting[sectorID] + if !exists { // pieces can share sectors + s.waiting[sectorID] = make(chan struct{}) + } + s.lk.Unlock() + + return sectorID, nil +} + +func (s *Store) CloseIncoming(c <-chan sectorbuilder.SectorSealingStatus) { + s.lk.Lock() + var at = -1 + for i, ch := range s.incoming { + if ch == c { + at = i + } + } + if at == -1 { + s.lk.Unlock() + return + } + if len(s.incoming) > 1 { + last := len(s.incoming) - 1 + s.incoming[at] = s.incoming[last] + s.incoming[last] = nil + } + s.incoming = s.incoming[:len(s.incoming)-1] + s.lk.Unlock() +} + +func (s *Store) Incoming() <-chan sectorbuilder.SectorSealingStatus { + ch := make(chan sectorbuilder.SectorSealingStatus, 8) + s.lk.Lock() + s.incoming = append(s.incoming, ch) + s.lk.Unlock() + return ch +} + +func (s *Store) WaitSeal(ctx context.Context, sector uint64) (sectorbuilder.SectorSealingStatus, error) { + s.lk.Lock() + watch, ok := s.waiting[sector] + s.lk.Unlock() + if ok { + select { + case <-watch: + case <-ctx.Done(): + return sectorbuilder.SectorSealingStatus{}, ctx.Err() + } + } + + return s.sb.SealStatus(sector) +} + +func (s *Store) Stop() { + close(s.closeCh) +} + +func withTemp(r io.Reader, cb func(string) error) error { + f, err := ioutil.TempFile(os.TempDir(), "lotus-temp-") + if err != nil { + return err + } + if _, err := io.Copy(f, r); err != nil { + return err + } + if err := f.Close(); err != nil { + return err + } + + err = cb(f.Name()) + if err != nil { + if err := os.Remove(f.Name()); err != nil { + log.Errorf("couldn't remove temp file '%s'", f.Name()) + } + return err + } + + return os.Remove(f.Name()) +}