From 399f91940b9026375a71e02b8c129c1a5e870316 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 14 Aug 2019 22:27:10 +0200 Subject: [PATCH] Sector store --- chain/deals/handler.go | 14 ++-- chain/deals/handler_states.go | 52 +++++------- node/builder.go | 6 ++ node/impl/storminer.go | 18 ++--- node/modules/services.go | 14 ++++ node/modules/storageminer.go | 5 +- storage/miner.go | 12 ++- storage/sector/store.go | 145 ++++++++++++++++++++++++++++++++++ 8 files changed, 207 insertions(+), 59 deletions(-) create mode 100644 storage/sector/store.go diff --git a/chain/deals/handler.go b/chain/deals/handler.go index a7ff46fe1..94f64a96b 100644 --- a/chain/deals/handler.go +++ b/chain/deals/handler.go @@ -6,8 +6,8 @@ import ( "github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/chain/address" - "github.com/filecoin-project/go-lotus/lib/sectorbuilder" "github.com/filecoin-project/go-lotus/node/modules/dtypes" + "github.com/filecoin-project/go-lotus/storage/sector" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" @@ -35,8 +35,8 @@ type MinerDeal struct { } type Handler struct { - sb *sectorbuilder.SectorBuilder - full api.FullNode + secst *sector.Store + full api.FullNode // TODO: Use a custom protocol or graphsync in the future // TODO: GC @@ -60,7 +60,7 @@ type dealUpdate struct { mut func(*MinerDeal) } -func NewHandler(ds dtypes.MetadataDS, sb *sectorbuilder.SectorBuilder, dag dtypes.StagingDAG, fullNode api.FullNode) (*Handler, error) { +func NewHandler(ds dtypes.MetadataDS, secst *sector.Store, dag dtypes.StagingDAG, fullNode api.FullNode) (*Handler, error) { addr, err := ds.Get(datastore.NewKey("miner-address")) if err != nil { return nil, err @@ -71,9 +71,9 @@ func NewHandler(ds dtypes.MetadataDS, sb *sectorbuilder.SectorBuilder, dag dtype } return &Handler{ - sb: sb, - dag: dag, - full: fullNode, + secst: secst, + dag: dag, + full: fullNode, conns: map[cid.Cid]inet.Stream{}, diff --git a/chain/deals/handler_states.go b/chain/deals/handler_states.go index 6c76ab969..486af2c83 100644 --- a/chain/deals/handler_states.go +++ b/chain/deals/handler_states.go @@ -2,8 +2,6 @@ package deals import ( "context" - "time" - "github.com/filecoin-project/go-lotus/lib/sectorbuilder" files "github.com/ipfs/go-ipfs-files" @@ -88,11 +86,7 @@ func (h *Handler) staged(ctx context.Context, deal MinerDeal) (func(*MinerDeal), return nil, xerrors.Errorf("failed to get file size: %s", err) } - var sectorID uint64 - err = withTemp(uf, func(f string) (err error) { - sectorID, err = h.sb.AddPiece(deal.Proposal.PieceRef, uint64(size), f) - return err - }) + sectorID, err := h.secst.AddPiece(deal.Proposal.PieceRef, uint64(size), uf, deal.Proposal.Duration) if err != nil { return nil, xerrors.Errorf("AddPiece failed: %s", err) } @@ -117,37 +111,29 @@ func getInclusionProof(ref string, status sectorbuilder.SectorSealingStatus) (Pi return PieceInclusionProof{}, xerrors.Errorf("pieceInclusionProof for %s in sector %d not found", ref, status.SectorID) } -func (h *Handler) pollSectorSealed(deal MinerDeal) (status sectorbuilder.SectorSealingStatus, err error) { -loop: - for { - status, err = h.sb.SealStatus(deal.SectorID) - if err != nil { - return sectorbuilder.SectorSealingStatus{}, err - } - - switch status.SealStatusCode { - case 0: // sealed - break loop - case 2: // failed - return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("sealing sector %d for deal %s (ref=%s) failed: %s", deal.SectorID, deal.ProposalCid, deal.Ref, status.SealErrorMsg) - case 1: // pending - if err := h.sb.SealAllStagedSectors(); err != nil { - return sectorbuilder.SectorSealingStatus{}, err - } - // start seal - fallthrough - case 3: // sealing - // wait - default: - return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("unknown SealStatusCode: %d", status.SectorID) - } - time.Sleep(3 * time.Second) +func (h *Handler) waitSealed(deal MinerDeal) (sectorbuilder.SectorSealingStatus, error) { + status, err := h.secst.WaitSeal(context.TODO(), deal.SectorID) + if err != nil { + return sectorbuilder.SectorSealingStatus{}, err } + + switch status.SealStatusCode { + case 0: // sealed + case 2: // failed + return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("sealing sector %d for deal %s (ref=%s) failed: %s", deal.SectorID, deal.ProposalCid, deal.Ref, status.SealErrorMsg) + case 1: // pending + return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("sector status was 'pending' after call to WaitSeal (for sector %d)", deal.SectorID) + case 3: // sealing + return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("sector status was 'wait' after call to WaitSeal (for sector %d)", deal.SectorID) + default: + return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("unknown SealStatusCode: %d", status.SectorID) + } + return status, nil } func (h *Handler) sealing(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) { - status, err := h.pollSectorSealed(deal) + status, err := h.waitSealed(deal) if err != nil { return nil, err } diff --git a/node/builder.go b/node/builder.go index d541f8d2d..9155da058 100644 --- a/node/builder.go +++ b/node/builder.go @@ -35,6 +35,7 @@ import ( "github.com/filecoin-project/go-lotus/node/repo" "github.com/filecoin-project/go-lotus/paych" "github.com/filecoin-project/go-lotus/storage" + "github.com/filecoin-project/go-lotus/storage/sector" ) // special is a type used to give keys to modules which @@ -74,7 +75,10 @@ const ( HandleIncomingMessagesKey RunDealClientKey + + // storage miner HandleDealsKey + RunSectorServiceKey // daemon ExtractApiKey @@ -223,12 +227,14 @@ func Online() Option { // Storage miner ApplyIf(func(s *Settings) bool { return s.nodeType == nodeStorageMiner }, Override(new(*sectorbuilder.SectorBuilder), modules.SectorBuilder), + Override(new(*sector.Store), sector.NewStore), Override(new(*storage.Miner), modules.StorageMiner), Override(new(dtypes.StagingDAG), modules.StagingDAG), Override(new(*deals.Handler), deals.NewHandler), Override(HandleDealsKey, modules.HandleDeals), + Override(RunSectorServiceKey, modules.RunSectorService), ), ) } diff --git a/node/impl/storminer.go b/node/impl/storminer.go index a7ca3bbd0..77c93bd8f 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -4,7 +4,8 @@ import ( "context" "fmt" "github.com/filecoin-project/go-lotus/chain/address" - "io/ioutil" + "github.com/filecoin-project/go-lotus/storage/sector" + "io" "math/rand" "github.com/filecoin-project/go-lotus/api" @@ -17,6 +18,7 @@ type StorageMinerAPI struct { SectorBuilderConfig *sectorbuilder.SectorBuilderConfig SectorBuilder *sectorbuilder.SectorBuilder + Sectors *sector.Store Miner *storage.Miner } @@ -26,20 +28,10 @@ func (sm *StorageMinerAPI) ActorAddresses(context.Context) ([]address.Address, e } func (sm *StorageMinerAPI) StoreGarbageData(ctx context.Context) (uint64, error) { - maxSize := uint64(1016) // this is the most data we can fit in a 1024 byte sector - data := make([]byte, maxSize) - fi, err := ioutil.TempFile("", "lotus-garbage") - if err != nil { - return 0, err - } - - if _, err := fi.Write(data); err != nil { - return 0, err - } - fi.Close() + size := uint64(1016) // this is the most data we can fit in a 1024 byte sector name := fmt.Sprintf("fake-file-%d", rand.Intn(100000000)) - sectorId, err := sm.SectorBuilder.AddPiece(name, maxSize, fi.Name()) + sectorId, err := sm.Sectors.AddPiece(name, size, io.LimitReader(rand.New(rand.NewSource(42)), 1016), 0) if err != nil { return 0, err } diff --git a/node/modules/services.go b/node/modules/services.go index 653ab91a7..babee96d9 100644 --- a/node/modules/services.go +++ b/node/modules/services.go @@ -2,6 +2,7 @@ package modules import ( "context" + "github.com/filecoin-project/go-lotus/storage/sector" "github.com/libp2p/go-libp2p-core/host" inet "github.com/libp2p/go-libp2p-core/network" @@ -69,3 +70,16 @@ func RunDealClient(lc fx.Lifecycle, c *deals.Client) { }, }) } + +func RunSectorService(lc fx.Lifecycle, secst *sector.Store) { + lc.Append(fx.Hook{ + OnStart: func(context.Context) error { + secst.Service() + return nil + }, + OnStop: func(context.Context) error { + secst.Stop() + return nil + }, + }) +} diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 0cd005c2a..ef24d6ae7 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -2,6 +2,7 @@ package modules import ( "context" + "github.com/filecoin-project/go-lotus/storage/sector" "path/filepath" "github.com/ipfs/go-bitswap" @@ -81,13 +82,13 @@ func SectorBuilder(mctx helpers.MetricsCtx, lc fx.Lifecycle, sbc *sectorbuilder. return sb, nil } -func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h host.Host, ds dtypes.MetadataDS, sb *sectorbuilder.SectorBuilder) (*storage.Miner, error) { +func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h host.Host, ds dtypes.MetadataDS, secst *sector.Store) (*storage.Miner, error) { maddr, err := minerAddrFromDS(ds) if err != nil { return nil, err } - sm, err := storage.NewMiner(api, maddr, h, ds, sb) + sm, err := storage.NewMiner(api, maddr, h, ds, secst) if err != nil { return nil, err } diff --git a/storage/miner.go b/storage/miner.go index 93b1993d9..81e1a35b9 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -3,6 +3,7 @@ package storage import ( "context" "fmt" + "github.com/filecoin-project/go-lotus/storage/sector" "github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/chain/actors" @@ -23,7 +24,7 @@ var log = logging.Logger("storageminer") type Miner struct { api storageMinerApi - sb *sectorbuilder.SectorBuilder + secst *sector.Store maddr address.Address @@ -52,13 +53,13 @@ type storageMinerApi interface { WalletHas(context.Context, address.Address) (bool, error) } -func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datastore.Batching, sb *sectorbuilder.SectorBuilder) (*Miner, error) { +func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datastore.Batching, secst *sector.Store) (*Miner, error) { return &Miner{ api: api, maddr: addr, h: h, ds: ds, - sb: sb, + secst: secst, }, nil } @@ -73,9 +74,12 @@ func (m *Miner) Run(ctx context.Context) error { } func (m *Miner) handlePostingSealedSectors(ctx context.Context) { + incoming := m.secst.Incoming() + defer m.secst.CloseIncoming(incoming) + for { select { - case sinfo, ok := <-m.sb.SealedSectorChan(): + case sinfo, ok := <-incoming: if !ok { // TODO: set some state variable so that this state can be // visible via some status command diff --git a/storage/sector/store.go b/storage/sector/store.go new file mode 100644 index 000000000..8bce6a7e0 --- /dev/null +++ b/storage/sector/store.go @@ -0,0 +1,145 @@ +package sector + +import ( + "context" + "github.com/filecoin-project/go-lotus/lib/sectorbuilder" + "io" + "io/ioutil" + "os" + "sync" +) + +// TODO: eventually handle sector storage here instead of in rust-sectorbuilder +type Store struct { + lk sync.Mutex + sb *sectorbuilder.SectorBuilder + + waiting map[uint64]chan struct{} + incoming []chan sectorbuilder.SectorSealingStatus + // TODO: outdated chan + + close chan struct{} +} + +func NewStore(sb *sectorbuilder.SectorBuilder) *Store { + return &Store{ + sb: sb, + waiting: map[uint64]chan struct{}{}, + close: make(chan struct{}), + } +} + +func (s *Store) Service() { + go s.service() +} + +func (s *Store) service() { + sealed := s.sb.SealedSectorChan() + + for { + select { + case sector := <-sealed: + s.lk.Lock() + watch, ok := s.waiting[sector.SectorID] + if ok { + close(watch) + delete(s.waiting, sector.SectorID) + } + for _, c := range s.incoming { + c <- sector // TODO: ctx! + } + s.lk.Unlock() + case <-s.close: + s.lk.Lock() + for _, c := range s.incoming { + close(c) + } + s.lk.Unlock() + return + } + } +} + +func (s *Store) AddPiece(ref string, size uint64, r io.Reader, keepAtLeast uint64) (sectorID uint64, err error) { + err = withTemp(r, func(f string) (err error) { + sectorID, err = s.sb.AddPiece(ref, size, f) + return err + }) + if err != nil { + return 0, err + } + s.lk.Lock() + _, exists := s.waiting[sectorID] + if !exists { + s.waiting[sectorID] = make(chan struct{}) + } + s.lk.Unlock() + return sectorID, nil +} + +func (s *Store) CloseIncoming(c <-chan sectorbuilder.SectorSealingStatus) { + s.lk.Lock() + var at = -1 + for i, ch := range s.incoming { + if ch == c { + at = i + } + } + if at == -1 { + s.lk.Unlock() + return + } + if len(s.incoming) > 1 { + s.incoming[at] = s.incoming[len(s.incoming)-1] + } + s.incoming = s.incoming[:len(s.incoming)-1] + s.lk.Unlock() +} + +func (s *Store) Incoming() <-chan sectorbuilder.SectorSealingStatus { + ch := make(chan sectorbuilder.SectorSealingStatus, 8) + s.lk.Lock() + s.incoming = append(s.incoming, ch) + s.lk.Unlock() + return ch +} + +func (s *Store) WaitSeal(ctx context.Context, sector uint64) (sectorbuilder.SectorSealingStatus, error) { + s.lk.Lock() + watch, ok := s.waiting[sector] + s.lk.Unlock() + if ok { + select { + case <-watch: + case <-ctx.Done(): + return sectorbuilder.SectorSealingStatus{}, ctx.Err() + } + } + + return s.sb.SealStatus(sector) +} + +func (s *Store) Stop() { + close(s.close) +} + +func withTemp(r io.Reader, cb func(string) error) error { + f, err := ioutil.TempFile(os.TempDir(), "lotus-temp-") + if err != nil { + return err + } + if _, err := io.Copy(f, r); err != nil { + return err + } + if err := f.Close(); err != nil { + return err + } + + err = cb(f.Name()) + if err != nil { + os.Remove(f.Name()) + return err + } + + return os.Remove(f.Name()) +}