very basic sector seal scheduling

This commit is contained in:
Łukasz Magiera 2019-10-27 09:56:53 +01:00
parent 5257b1cce1
commit 874be79958
12 changed files with 71 additions and 40 deletions

View File

@ -167,9 +167,6 @@ type StorageMiner interface {
// List all staged sectors
SectorsList(context.Context) ([]uint64, error)
// Seal all staged sectors
SectorsStagedSeal(context.Context) error
SectorsRefs(context.Context) (map[string][]SealedRef, error)
}

View File

@ -131,7 +131,6 @@ type StorageMinerStruct struct {
SectorsStatus func(context.Context, uint64) (sectorbuilder.SectorSealingStatus, error) `perm:"read"`
SectorsList func(context.Context) ([]uint64, error) `perm:"read"`
SectorsStagedSeal func(context.Context) error `perm:"write"`
SectorsRefs func(context.Context) (map[string][]SealedRef, error) `perm:"read"`
}
@ -476,11 +475,6 @@ func (c *StorageMinerStruct) SectorsList(ctx context.Context) ([]uint64, error)
return c.Internal.SectorsList(ctx)
}
// Seal all staged sectors
func (c *StorageMinerStruct) SectorsStagedSeal(ctx context.Context) error {
return c.Internal.SectorsStagedSeal(ctx)
}
func (c *StorageMinerStruct) SectorsRefs(ctx context.Context) (map[string][]SealedRef, error) {
return c.Internal.SectorsRefs(ctx)
}

View File

@ -273,6 +273,10 @@ func (p *Provider) sealing(ctx context.Context, deal MinerDeal) (func(*MinerDeal
log.Warnf("Sending deal response failed: %s", err)
}
if err := p.secst.SealSector(ctx, deal.SectorID); err != nil {
return nil, xerrors.Errorf("sealing sector failed: %w", err)
}
_, err = p.waitSealed(ctx, deal)
if err != nil {
return nil, err

View File

@ -36,7 +36,6 @@ var sectorsCmd = &cli.Command{
Subcommands: []*cli.Command{
sectorsStatusCmd,
sectorsStagedListCmd,
sectorsStagedSealCmd,
sectorsRefsCmd,
},
}
@ -102,21 +101,6 @@ var sectorsStagedListCmd = &cli.Command{
},
}
var sectorsStagedSealCmd = &cli.Command{
Name: "seal-staged", // TODO: nest this under a 'staged' subcommand? idk
Usage: "Seal staged sectors",
Action: func(cctx *cli.Context) error {
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
return nodeApi.SectorsStagedSeal(ctx)
},
}
var sectorsRefsCmd = &cli.Command{
Name: "refs",
Usage: "List References to sectors",

View File

@ -25,6 +25,10 @@ type SortedSectorInfo = sectorbuilder.SortedSectorInfo
type SectorInfo = sectorbuilder.SectorInfo
type SealTicket = sectorbuilder.SealTicket
type SealedSectorMetadata = sectorbuilder.SealedSectorMetadata
const CommLen = sectorbuilder.CommitmentBytesLen
type SectorBuilder struct {
@ -87,10 +91,8 @@ func (sb *SectorBuilder) ReadPieceFromSealedSector(pieceKey string) ([]byte, err
return sectorbuilder.ReadPieceFromSealedSector(sb.handle, pieceKey)
}
func (sb *SectorBuilder) SealAllStagedSectors() error {
panic("dont call this")
_, err := sectorbuilder.SealAllStagedSectors(sb.handle, sectorbuilder.SealTicket{})
return err
func (sb *SectorBuilder) SealSector(sectorID uint64, ticket SealTicket) (SealedSectorMetadata, error) {
return sectorbuilder.SealSector(sb.handle, sectorID, ticket)
}
func (sb *SectorBuilder) SealStatus(sector uint64) (SectorSealingStatus, error) {

View File

@ -47,7 +47,8 @@ func TestSealAndVerify(t *testing.T) {
t.Fatal(err)
}
store := sector.NewStore(sb, datastore.NewMapDatastore())
// TODO: Consider fixing
store := sector.NewStore(sb, datastore.NewMapDatastore(), nil)
store.Service()
ssinfo := <-store.Incoming()

View File

@ -13,6 +13,8 @@ let sealCodes = [
"Failed",
"Sealing",
"Sealed",
"Paused",
"ReadyForSealing",
]
class StorageNode extends React.Component {
@ -122,7 +124,7 @@ class StorageNode extends React.Component {
<div>{this.state.statusCounts.map((c, i) => <span key={i}>{sealCodes[i]}: {c} | </span>)}</div>
<div>
{this.state.staged ? this.state.staged.map((s, i) => (
<div key={i}>{s.SectorID} {sealCodes[s.State]}</div>
<div key={i}>{s.SectorID} {sealCodes[s.State] || `unk ${s.State}`}</div>
)) : <div/>}
</div>

View File

@ -235,6 +235,7 @@ func Online() Option {
Override(new(*sector.Store), sector.NewStore),
Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks),
Override(new(*commitment.Tracker), commitment.NewTracker),
Override(new(sector.TicketFn), modules.SealTicketGen),
Override(new(*storage.Miner), modules.StorageMiner),
Override(new(dtypes.StagingDAG), modules.StagingDAG),

View File

@ -40,11 +40,15 @@ func (sm *StorageMinerAPI) StoreGarbageData(ctx context.Context) (uint64, error)
// TODO: create a deal
name := fmt.Sprintf("fake-file-%d", rand.Intn(100000000))
sectorId, err := sm.Sectors.AddPiece(name, size, io.LimitReader(rand.New(rand.NewSource(42)), int64(size)), 0)
sectorId, err := sm.Sectors.AddPiece(name, size, io.LimitReader(rand.New(rand.NewSource(42)), int64(size)))
if err != nil {
return 0, err
}
if err := sm.Sectors.SealSector(ctx, sectorId); err != nil {
return sectorId, err
}
return sectorId, err
}
@ -57,11 +61,6 @@ func (sm *StorageMinerAPI) SectorsList(context.Context) ([]uint64, error) {
return sm.SectorBuilder.GetAllStagedSectors()
}
// Seal all staged sectors
func (sm *StorageMinerAPI) SectorsStagedSeal(context.Context) error {
return sm.SectorBuilder.SealAllStagedSectors()
}
func (sm *StorageMinerAPI) SectorsRefs(context.Context) (map[string][]api.SealedRef, error) {
// json can't handle cids as map keys
out := map[string][]api.SealedRef{}

View File

@ -14,8 +14,10 @@ import (
"github.com/libp2p/go-libp2p-core/routing"
"github.com/mitchellh/go-homedir"
"go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/chain/deals"
"github.com/filecoin-project/lotus/lib/sectorbuilder"
@ -161,3 +163,28 @@ func RegisterMiner(lc fx.Lifecycle, ds dtypes.MetadataDS, api api.FullNode) erro
})
return nil
}
func SealTicketGen(api api.FullNode) sector.TicketFn {
return func(ctx context.Context) (*sectorbuilder.SealTicket, error) {
ts, err := api.ChainHead(ctx)
if err != nil {
return nil, xerrors.Errorf("getting head ts for SealTicket failed: %w", err)
}
r, err := api.ChainGetRandomness(ctx, ts, nil, build.PoSTChallangeTime)
if err != nil {
return nil, xerrors.Errorf("getting randomness for SealTicket failed: %w", err)
}
var tkt [sectorbuilder.CommLen]byte
if n := copy(tkt[:], r); n != sectorbuilder.CommLen {
return nil, xerrors.Errorf("unexpected randomness len: %d (expected %d)", n, sectorbuilder.CommLen)
}
return &sectorbuilder.SealTicket{
BlockHeight: ts.Height(),
TicketBytes: tkt,
}, nil
}
}

View File

@ -144,6 +144,7 @@ func (m *Miner) commitSector(ctx context.Context, sinfo sectorbuilder.SectorSeal
CommD: sinfo.CommD[:],
CommR: sinfo.CommR[:],
Proof: sinfo.Proof,
Epoch: sinfo.Ticket.BlockHeight,
DealIDs: deals,
SectorNumber: sinfo.SectorID,

View File

@ -32,11 +32,14 @@ type dealMapping struct {
Committed bool
}
type TicketFn func(context.Context) (*sectorbuilder.SealTicket, error)
// TODO: eventually handle sector storage here instead of in rust-sectorbuilder
type Store struct {
waitingLk sync.Mutex
sb *sectorbuilder.SectorBuilder
tktFn TicketFn
dealsLk sync.Mutex
deals datastore.Datastore
@ -48,9 +51,10 @@ type Store struct {
closeCh chan struct{}
}
func NewStore(sb *sectorbuilder.SectorBuilder, ds dtypes.MetadataDS) *Store {
func NewStore(sb *sectorbuilder.SectorBuilder, ds dtypes.MetadataDS, tktFn TicketFn) *Store {
return &Store{
sb: sb,
tktFn:tktFn,
deals: namespace.Wrap(ds, sectorDealsPrefix),
waiting: map[uint64]chan struct{}{},
closeCh: make(chan struct{}),
@ -121,7 +125,7 @@ func (s *Store) service() {
}
}
func (s *Store) AddPiece(ref string, size uint64, r io.Reader, dealID uint64) (sectorID uint64, err error) {
func (s *Store) AddPiece(ref string, size uint64, r io.Reader, dealIDs ...uint64) (sectorID uint64, err error) {
sectorID, err = s.sb.AddPiece(ref, size, r)
if err != nil {
return 0, err
@ -150,7 +154,7 @@ func (s *Store) AddPiece(ref string, size uint64, r io.Reader, dealID uint64) (s
}
fallthrough
case datastore.ErrNotFound:
deals.DealIDs = append(deals.DealIDs, dealID)
deals.DealIDs = append(deals.DealIDs, dealIDs...)
d, err := cbor.DumpObject(&deals)
if err != nil {
return 0, err
@ -200,6 +204,21 @@ func (s *Store) DealsForCommit(sectorID uint64) ([]uint64, error) {
}
}
func (s *Store) SealSector(ctx context.Context, sectorID uint64) error {
tkt, err := s.tktFn(ctx)
if err != nil {
return err
}
// TODO: That's not async, is it?
// - If not then we probably can drop this wait-for-seal hack below
_, err = s.sb.SealSector(sectorID, *tkt)
if err != nil {
return err
}
return nil
}
func (s *Store) CloseIncoming(c <-chan sectorbuilder.SectorSealingStatus) {
s.waitingLk.Lock()
var at = -1