Sector store

This commit is contained in:
Łukasz Magiera 2019-08-14 22:27:10 +02:00
parent e050d56594
commit 399f91940b
8 changed files with 207 additions and 59 deletions

View File

@ -6,8 +6,8 @@ import (
"github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/chain/address" "github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/lib/sectorbuilder"
"github.com/filecoin-project/go-lotus/node/modules/dtypes" "github.com/filecoin-project/go-lotus/node/modules/dtypes"
"github.com/filecoin-project/go-lotus/storage/sector"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
@ -35,8 +35,8 @@ type MinerDeal struct {
} }
type Handler struct { type Handler struct {
sb *sectorbuilder.SectorBuilder secst *sector.Store
full api.FullNode full api.FullNode
// TODO: Use a custom protocol or graphsync in the future // TODO: Use a custom protocol or graphsync in the future
// TODO: GC // TODO: GC
@ -60,7 +60,7 @@ type dealUpdate struct {
mut func(*MinerDeal) mut func(*MinerDeal)
} }
func NewHandler(ds dtypes.MetadataDS, sb *sectorbuilder.SectorBuilder, dag dtypes.StagingDAG, fullNode api.FullNode) (*Handler, error) { func NewHandler(ds dtypes.MetadataDS, secst *sector.Store, dag dtypes.StagingDAG, fullNode api.FullNode) (*Handler, error) {
addr, err := ds.Get(datastore.NewKey("miner-address")) addr, err := ds.Get(datastore.NewKey("miner-address"))
if err != nil { if err != nil {
return nil, err return nil, err
@ -71,9 +71,9 @@ func NewHandler(ds dtypes.MetadataDS, sb *sectorbuilder.SectorBuilder, dag dtype
} }
return &Handler{ return &Handler{
sb: sb, secst: secst,
dag: dag, dag: dag,
full: fullNode, full: fullNode,
conns: map[cid.Cid]inet.Stream{}, conns: map[cid.Cid]inet.Stream{},

View File

@ -2,8 +2,6 @@ package deals
import ( import (
"context" "context"
"time"
"github.com/filecoin-project/go-lotus/lib/sectorbuilder" "github.com/filecoin-project/go-lotus/lib/sectorbuilder"
files "github.com/ipfs/go-ipfs-files" files "github.com/ipfs/go-ipfs-files"
@ -88,11 +86,7 @@ func (h *Handler) staged(ctx context.Context, deal MinerDeal) (func(*MinerDeal),
return nil, xerrors.Errorf("failed to get file size: %s", err) return nil, xerrors.Errorf("failed to get file size: %s", err)
} }
var sectorID uint64 sectorID, err := h.secst.AddPiece(deal.Proposal.PieceRef, uint64(size), uf, deal.Proposal.Duration)
err = withTemp(uf, func(f string) (err error) {
sectorID, err = h.sb.AddPiece(deal.Proposal.PieceRef, uint64(size), f)
return err
})
if err != nil { if err != nil {
return nil, xerrors.Errorf("AddPiece failed: %s", err) return nil, xerrors.Errorf("AddPiece failed: %s", err)
} }
@ -117,37 +111,29 @@ func getInclusionProof(ref string, status sectorbuilder.SectorSealingStatus) (Pi
return PieceInclusionProof{}, xerrors.Errorf("pieceInclusionProof for %s in sector %d not found", ref, status.SectorID) return PieceInclusionProof{}, xerrors.Errorf("pieceInclusionProof for %s in sector %d not found", ref, status.SectorID)
} }
func (h *Handler) pollSectorSealed(deal MinerDeal) (status sectorbuilder.SectorSealingStatus, err error) { func (h *Handler) waitSealed(deal MinerDeal) (sectorbuilder.SectorSealingStatus, error) {
loop: status, err := h.secst.WaitSeal(context.TODO(), deal.SectorID)
for { if err != nil {
status, err = h.sb.SealStatus(deal.SectorID) return sectorbuilder.SectorSealingStatus{}, err
if err != nil {
return sectorbuilder.SectorSealingStatus{}, err
}
switch status.SealStatusCode {
case 0: // sealed
break loop
case 2: // failed
return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("sealing sector %d for deal %s (ref=%s) failed: %s", deal.SectorID, deal.ProposalCid, deal.Ref, status.SealErrorMsg)
case 1: // pending
if err := h.sb.SealAllStagedSectors(); err != nil {
return sectorbuilder.SectorSealingStatus{}, err
}
// start seal
fallthrough
case 3: // sealing
// wait
default:
return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("unknown SealStatusCode: %d", status.SectorID)
}
time.Sleep(3 * time.Second)
} }
switch status.SealStatusCode {
case 0: // sealed
case 2: // failed
return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("sealing sector %d for deal %s (ref=%s) failed: %s", deal.SectorID, deal.ProposalCid, deal.Ref, status.SealErrorMsg)
case 1: // pending
return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("sector status was 'pending' after call to WaitSeal (for sector %d)", deal.SectorID)
case 3: // sealing
return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("sector status was 'wait' after call to WaitSeal (for sector %d)", deal.SectorID)
default:
return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("unknown SealStatusCode: %d", status.SectorID)
}
return status, nil return status, nil
} }
func (h *Handler) sealing(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) { func (h *Handler) sealing(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) {
status, err := h.pollSectorSealed(deal) status, err := h.waitSealed(deal)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -35,6 +35,7 @@ import (
"github.com/filecoin-project/go-lotus/node/repo" "github.com/filecoin-project/go-lotus/node/repo"
"github.com/filecoin-project/go-lotus/paych" "github.com/filecoin-project/go-lotus/paych"
"github.com/filecoin-project/go-lotus/storage" "github.com/filecoin-project/go-lotus/storage"
"github.com/filecoin-project/go-lotus/storage/sector"
) )
// special is a type used to give keys to modules which // special is a type used to give keys to modules which
@ -74,7 +75,10 @@ const (
HandleIncomingMessagesKey HandleIncomingMessagesKey
RunDealClientKey RunDealClientKey
// storage miner
HandleDealsKey HandleDealsKey
RunSectorServiceKey
// daemon // daemon
ExtractApiKey ExtractApiKey
@ -223,12 +227,14 @@ func Online() Option {
// Storage miner // Storage miner
ApplyIf(func(s *Settings) bool { return s.nodeType == nodeStorageMiner }, ApplyIf(func(s *Settings) bool { return s.nodeType == nodeStorageMiner },
Override(new(*sectorbuilder.SectorBuilder), modules.SectorBuilder), Override(new(*sectorbuilder.SectorBuilder), modules.SectorBuilder),
Override(new(*sector.Store), sector.NewStore),
Override(new(*storage.Miner), modules.StorageMiner), Override(new(*storage.Miner), modules.StorageMiner),
Override(new(dtypes.StagingDAG), modules.StagingDAG), Override(new(dtypes.StagingDAG), modules.StagingDAG),
Override(new(*deals.Handler), deals.NewHandler), Override(new(*deals.Handler), deals.NewHandler),
Override(HandleDealsKey, modules.HandleDeals), Override(HandleDealsKey, modules.HandleDeals),
Override(RunSectorServiceKey, modules.RunSectorService),
), ),
) )
} }

View File

@ -4,7 +4,8 @@ import (
"context" "context"
"fmt" "fmt"
"github.com/filecoin-project/go-lotus/chain/address" "github.com/filecoin-project/go-lotus/chain/address"
"io/ioutil" "github.com/filecoin-project/go-lotus/storage/sector"
"io"
"math/rand" "math/rand"
"github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/api"
@ -17,6 +18,7 @@ type StorageMinerAPI struct {
SectorBuilderConfig *sectorbuilder.SectorBuilderConfig SectorBuilderConfig *sectorbuilder.SectorBuilderConfig
SectorBuilder *sectorbuilder.SectorBuilder SectorBuilder *sectorbuilder.SectorBuilder
Sectors *sector.Store
Miner *storage.Miner Miner *storage.Miner
} }
@ -26,20 +28,10 @@ func (sm *StorageMinerAPI) ActorAddresses(context.Context) ([]address.Address, e
} }
func (sm *StorageMinerAPI) StoreGarbageData(ctx context.Context) (uint64, error) { func (sm *StorageMinerAPI) StoreGarbageData(ctx context.Context) (uint64, error) {
maxSize := uint64(1016) // this is the most data we can fit in a 1024 byte sector size := uint64(1016) // this is the most data we can fit in a 1024 byte sector
data := make([]byte, maxSize)
fi, err := ioutil.TempFile("", "lotus-garbage")
if err != nil {
return 0, err
}
if _, err := fi.Write(data); err != nil {
return 0, err
}
fi.Close()
name := fmt.Sprintf("fake-file-%d", rand.Intn(100000000)) name := fmt.Sprintf("fake-file-%d", rand.Intn(100000000))
sectorId, err := sm.SectorBuilder.AddPiece(name, maxSize, fi.Name()) sectorId, err := sm.Sectors.AddPiece(name, size, io.LimitReader(rand.New(rand.NewSource(42)), 1016), 0)
if err != nil { if err != nil {
return 0, err return 0, err
} }

View File

@ -2,6 +2,7 @@ package modules
import ( import (
"context" "context"
"github.com/filecoin-project/go-lotus/storage/sector"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
inet "github.com/libp2p/go-libp2p-core/network" inet "github.com/libp2p/go-libp2p-core/network"
@ -69,3 +70,16 @@ func RunDealClient(lc fx.Lifecycle, c *deals.Client) {
}, },
}) })
} }
func RunSectorService(lc fx.Lifecycle, secst *sector.Store) {
lc.Append(fx.Hook{
OnStart: func(context.Context) error {
secst.Service()
return nil
},
OnStop: func(context.Context) error {
secst.Stop()
return nil
},
})
}

View File

@ -2,6 +2,7 @@ package modules
import ( import (
"context" "context"
"github.com/filecoin-project/go-lotus/storage/sector"
"path/filepath" "path/filepath"
"github.com/ipfs/go-bitswap" "github.com/ipfs/go-bitswap"
@ -81,13 +82,13 @@ func SectorBuilder(mctx helpers.MetricsCtx, lc fx.Lifecycle, sbc *sectorbuilder.
return sb, nil return sb, nil
} }
func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h host.Host, ds dtypes.MetadataDS, sb *sectorbuilder.SectorBuilder) (*storage.Miner, error) { func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h host.Host, ds dtypes.MetadataDS, secst *sector.Store) (*storage.Miner, error) {
maddr, err := minerAddrFromDS(ds) maddr, err := minerAddrFromDS(ds)
if err != nil { if err != nil {
return nil, err return nil, err
} }
sm, err := storage.NewMiner(api, maddr, h, ds, sb) sm, err := storage.NewMiner(api, maddr, h, ds, secst)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -3,6 +3,7 @@ package storage
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/filecoin-project/go-lotus/storage/sector"
"github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/chain/actors" "github.com/filecoin-project/go-lotus/chain/actors"
@ -23,7 +24,7 @@ var log = logging.Logger("storageminer")
type Miner struct { type Miner struct {
api storageMinerApi api storageMinerApi
sb *sectorbuilder.SectorBuilder secst *sector.Store
maddr address.Address maddr address.Address
@ -52,13 +53,13 @@ type storageMinerApi interface {
WalletHas(context.Context, address.Address) (bool, error) WalletHas(context.Context, address.Address) (bool, error)
} }
func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datastore.Batching, sb *sectorbuilder.SectorBuilder) (*Miner, error) { func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datastore.Batching, secst *sector.Store) (*Miner, error) {
return &Miner{ return &Miner{
api: api, api: api,
maddr: addr, maddr: addr,
h: h, h: h,
ds: ds, ds: ds,
sb: sb, secst: secst,
}, nil }, nil
} }
@ -73,9 +74,12 @@ func (m *Miner) Run(ctx context.Context) error {
} }
func (m *Miner) handlePostingSealedSectors(ctx context.Context) { func (m *Miner) handlePostingSealedSectors(ctx context.Context) {
incoming := m.secst.Incoming()
defer m.secst.CloseIncoming(incoming)
for { for {
select { select {
case sinfo, ok := <-m.sb.SealedSectorChan(): case sinfo, ok := <-incoming:
if !ok { if !ok {
// TODO: set some state variable so that this state can be // TODO: set some state variable so that this state can be
// visible via some status command // visible via some status command

145
storage/sector/store.go Normal file
View File

@ -0,0 +1,145 @@
package sector
import (
"context"
"github.com/filecoin-project/go-lotus/lib/sectorbuilder"
"io"
"io/ioutil"
"os"
"sync"
)
// TODO: eventually handle sector storage here instead of in rust-sectorbuilder
type Store struct {
lk sync.Mutex
sb *sectorbuilder.SectorBuilder
waiting map[uint64]chan struct{}
incoming []chan sectorbuilder.SectorSealingStatus
// TODO: outdated chan
close chan struct{}
}
func NewStore(sb *sectorbuilder.SectorBuilder) *Store {
return &Store{
sb: sb,
waiting: map[uint64]chan struct{}{},
close: make(chan struct{}),
}
}
func (s *Store) Service() {
go s.service()
}
func (s *Store) service() {
sealed := s.sb.SealedSectorChan()
for {
select {
case sector := <-sealed:
s.lk.Lock()
watch, ok := s.waiting[sector.SectorID]
if ok {
close(watch)
delete(s.waiting, sector.SectorID)
}
for _, c := range s.incoming {
c <- sector // TODO: ctx!
}
s.lk.Unlock()
case <-s.close:
s.lk.Lock()
for _, c := range s.incoming {
close(c)
}
s.lk.Unlock()
return
}
}
}
func (s *Store) AddPiece(ref string, size uint64, r io.Reader, keepAtLeast uint64) (sectorID uint64, err error) {
err = withTemp(r, func(f string) (err error) {
sectorID, err = s.sb.AddPiece(ref, size, f)
return err
})
if err != nil {
return 0, err
}
s.lk.Lock()
_, exists := s.waiting[sectorID]
if !exists {
s.waiting[sectorID] = make(chan struct{})
}
s.lk.Unlock()
return sectorID, nil
}
func (s *Store) CloseIncoming(c <-chan sectorbuilder.SectorSealingStatus) {
s.lk.Lock()
var at = -1
for i, ch := range s.incoming {
if ch == c {
at = i
}
}
if at == -1 {
s.lk.Unlock()
return
}
if len(s.incoming) > 1 {
s.incoming[at] = s.incoming[len(s.incoming)-1]
}
s.incoming = s.incoming[:len(s.incoming)-1]
s.lk.Unlock()
}
func (s *Store) Incoming() <-chan sectorbuilder.SectorSealingStatus {
ch := make(chan sectorbuilder.SectorSealingStatus, 8)
s.lk.Lock()
s.incoming = append(s.incoming, ch)
s.lk.Unlock()
return ch
}
func (s *Store) WaitSeal(ctx context.Context, sector uint64) (sectorbuilder.SectorSealingStatus, error) {
s.lk.Lock()
watch, ok := s.waiting[sector]
s.lk.Unlock()
if ok {
select {
case <-watch:
case <-ctx.Done():
return sectorbuilder.SectorSealingStatus{}, ctx.Err()
}
}
return s.sb.SealStatus(sector)
}
func (s *Store) Stop() {
close(s.close)
}
func withTemp(r io.Reader, cb func(string) error) error {
f, err := ioutil.TempFile(os.TempDir(), "lotus-temp-")
if err != nil {
return err
}
if _, err := io.Copy(f, r); err != nil {
return err
}
if err := f.Close(); err != nil {
return err
}
err = cb(f.Name())
if err != nil {
os.Remove(f.Name())
return err
}
return os.Remove(f.Name())
}