sealmgr: Interfaces; simple initial impl

This commit is contained in:
Łukasz Magiera 2020-03-03 01:45:32 +01:00
parent 98dbb2f70f
commit 3abb59a550
6 changed files with 272 additions and 50 deletions

View File

@ -1,12 +1,14 @@
package api
import (
"bytes"
"context"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/lotus/storage/sealmgr"
)
// alias because cbor-gen doesn't like non-alias types
@ -103,30 +105,14 @@ type StorageMiner interface {
SectorsUpdate(context.Context, abi.SectorNumber, SectorState) error
WorkerStats(context.Context) (WorkerStats, error)
/*WorkerStats(context.Context) (sealsched.WorkerStats, error)*/
// WorkerQueue registers a remote worker
/*// WorkerQueue registers a remote worker
WorkerQueue(context.Context, WorkerCfg) (<-chan WorkerTask, error)
WorkerDone(ctx context.Context, task uint64, res SealRes) error
WorkerDone(ctx context.Context, task uint64, res SealRes) error*/
}
type WorkerStats struct {
Total int
Free int
AddPieceWait int
PreCommitWait int
CommitWait int
UnsealWait int
}
type WorkerTask struct {
}
type WorkerCfg struct {
NoPreCommit bool
NoCommit bool
}
type SealRes struct {
Err string
GoErr error `json:"-"`
@ -150,8 +136,8 @@ type SectorInfo struct {
CommR *cid.Cid
Proof []byte
Deals []abi.DealID
Ticket SealTicket
Seed SealSeed
Ticket sealmgr.SealTicket
Seed sealmgr.SealSeed
Retries uint64
LastErr string
@ -168,21 +154,3 @@ type SealedRef struct {
type SealedRefs struct {
Refs []SealedRef
}
type SealTicket struct {
Value abi.SealRandomness
Epoch abi.ChainEpoch
}
type SealSeed struct {
Value abi.InteractiveSealRandomness
Epoch abi.ChainEpoch
}
func (st *SealTicket) Equals(ost *SealTicket) bool {
return bytes.Equal(st.Value, ost.Value) && st.Epoch == ost.Epoch
}
func (st *SealSeed) Equals(ost *SealSeed) bool {
return bytes.Equal(st.Value, ost.Value) && st.Epoch == ost.Epoch
}

View File

@ -9,7 +9,6 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/storagemarket"
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
@ -168,10 +167,10 @@ type StorageMinerStruct struct {
SectorsRefs func(context.Context) (map[string][]api.SealedRef, error) `perm:"read"`
SectorsUpdate func(context.Context, abi.SectorNumber, api.SectorState) error `perm:"write"`
WorkerStats func(context.Context) (sectorbuilder.WorkerStats, error) `perm:"read"`
WorkerQueue func(ctx context.Context, cfg sectorbuilder.WorkerCfg) (<-chan sectorbuilder.WorkerTask, error) `perm:"admin"` // TODO: worker perm
WorkerDone func(ctx context.Context, task uint64, res sectorbuilder.SealRes) error `perm:"admin"`
/* WorkerStats func(context.Context) (sealsched.WorkerStats, error) `perm:"read"`
*/
/* WorkerQueue func(ctx context.Context, cfg sealsched.WorkerCfg) (<-chan sealsched.WorkerTask, error) `perm:"admin"` // TODO: worker perm
WorkerDone func(ctx context.Context, task uint64, res sealsched.SealRes) error `perm:"admin"`*/
}
}
@ -601,17 +600,17 @@ func (c *StorageMinerStruct) SectorsUpdate(ctx context.Context, id abi.SectorNum
return c.Internal.SectorsUpdate(ctx, id, state)
}
func (c *StorageMinerStruct) WorkerStats(ctx context.Context) (sectorbuilder.WorkerStats, error) {
/*func (c *StorageMinerStruct) WorkerStats(ctx context.Context) (sealsched.WorkerStats, error) {
return c.Internal.WorkerStats(ctx)
}
}*/
func (c *StorageMinerStruct) WorkerQueue(ctx context.Context, cfg sectorbuilder.WorkerCfg) (<-chan sectorbuilder.WorkerTask, error) {
/*func (c *StorageMinerStruct) WorkerQueue(ctx context.Context, cfg sectorbuilder.WorkerCfg) (<-chan sectorbuilder.WorkerTask, error) {
return c.Internal.WorkerQueue(ctx, cfg)
}
func (c *StorageMinerStruct) WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error {
return c.Internal.WorkerDone(ctx, task, res)
}
}*/
var _ api.Common = &CommonStruct{}
var _ api.FullNode = &FullNodeStruct{}

92
storage/sealmgr/simple.go Normal file
View File

@ -0,0 +1,92 @@
package sealmgr
import (
"context"
"io"
"sync"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/storedcounter"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi"
ffi "github.com/filecoin-project/filecoin-ffi"
)
// Simple implements a very basic storage manager which has one local worker,
// running one thing locally
type Simple struct {
sc *storedcounter.StoredCounter
maddr address.Address
rateLimiter sync.Mutex
worker Worker
}
func NewSimpleManager(sc *storedcounter.StoredCounter, maddr address.Address, sb sectorbuilder.Basic) (*Simple, error) {
mid, err := address.IDFromAddress(maddr)
if err != nil {
return nil, xerrors.Errorf("get miner id: %w", err)
}
w := &LocalWorker{
sealer: sb,
mid: abi.ActorID(mid),
}
return &Simple{
sc: sc,
maddr: maddr,
worker: w,
}, nil
}
func (s *Simple) NewSector() (SectorInfo, error) {
n, err := s.sc.Next()
if err != nil {
return SectorInfo{}, xerrors.Errorf("acquire sector number: %w", err)
}
mid, err := address.IDFromAddress(s.maddr)
if err != nil {
return SectorInfo{}, xerrors.Errorf("get miner id: %w", err)
}
return SectorInfo{
ID: abi.SectorID{
Miner: abi.ActorID(mid),
Number: abi.SectorNumber(n),
},
}, nil
}
func (s *Simple) AddPiece(ctx context.Context, si SectorInfo, sz abi.UnpaddedPieceSize, r io.Reader) (cid.Cid, SectorInfo, error) {
s.rateLimiter.Lock()
defer s.rateLimiter.Unlock()
return s.worker.AddPiece(ctx, si, sz, r)
}
func (s *Simple) RunSeal(ctx context.Context, task TaskType, si SectorInfo) (SectorInfo, error) {
s.rateLimiter.Lock()
defer s.rateLimiter.Unlock()
return s.worker.Run(ctx, task, si)
}
func (s *Simple) GenerateEPostCandidates(sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, faults []abi.SectorNumber) ([]ffi.PoStCandidateWithTicket, error) {
return s.worker.(*LocalWorker).sealer.GenerateEPostCandidates(sectorInfo, challengeSeed, faults)
}
func (s *Simple) GenerateFallbackPoSt(sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, faults []abi.SectorNumber) ([]ffi.PoStCandidateWithTicket, []abi.PoStProof, error) {
return s.worker.(*LocalWorker).sealer.GenerateFallbackPoSt(sectorInfo, challengeSeed, faults)
}
func (s *Simple) ComputeElectionPoSt(sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, winners []abi.PoStCandidate) ([]abi.PoStProof, error) {
return s.worker.(*LocalWorker).sealer.ComputeElectionPoSt(sectorInfo, challengeSeed, winners)
}
var _ Manager = &Simple{}

9
storage/sealmgr/task.go Normal file
View File

@ -0,0 +1,9 @@
package sealmgr
type TaskType string
const (
TTPreCommit1 TaskType = "seal/v0/precommit/1"
TTPreCommit2 TaskType = "seal/v0/precommit/2" // Commit1 is called here too
TTCommit2 TaskType = "seal/v0/commit/2"
)

80
storage/sealmgr/types.go Normal file
View File

@ -0,0 +1,80 @@
package sealmgr
import (
"bytes"
"context"
"io"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi"
)
type SealTicket struct {
Value abi.SealRandomness
Epoch abi.ChainEpoch
}
type SealSeed struct {
Value abi.InteractiveSealRandomness
Epoch abi.ChainEpoch
}
func (st *SealTicket) Equals(ost *SealTicket) bool {
return bytes.Equal(st.Value, ost.Value) && st.Epoch == ost.Epoch
}
func (st *SealSeed) Equals(ost *SealSeed) bool {
return bytes.Equal(st.Value, ost.Value) && st.Epoch == ost.Epoch
}
// SectorInfo holds all sector-related metadata
type SectorInfo struct {
ID abi.SectorID
Pieces []abi.PieceInfo
Ticket SealTicket
Seed SealSeed
PreCommit1Out []byte
Sealed *cid.Cid
Unsealed *cid.Cid
CommitInput []byte
Proof []byte
}
func (si SectorInfo) PieceSizes() []abi.UnpaddedPieceSize {
out := make([]abi.UnpaddedPieceSize, len(si.Pieces))
for i := range out {
out[i] = si.Pieces[i].Size.Unpadded()
}
return nil
}
type Worker interface {
AddPiece(context.Context, SectorInfo, abi.UnpaddedPieceSize, io.Reader) (cid.Cid, SectorInfo, error)
Run(context.Context, TaskType, SectorInfo) (SectorInfo, error)
}
type Manager interface {
// NewSector allocates staging area for data
NewSector() (SectorInfo, error)
// AddPiece appends the piece to the specified sector. Returns PieceCID, and
// mutated sector info
//
// Note: The passed reader can support other transfer mechanisms, making
// it possible to move the data between data transfer module and workers
AddPiece(context.Context, SectorInfo, abi.UnpaddedPieceSize, io.Reader) (cid.Cid, SectorInfo, error)
RunSeal(ctx context.Context, task TaskType, si SectorInfo) (SectorInfo, error)
// Storage manager forwards proving calls
sectorbuilder.Prover
}

View File

@ -0,0 +1,74 @@
package sealmgr
import (
"context"
"io"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi"
)
type LocalWorker struct {
sealer sectorbuilder.Basic
mid abi.ActorID
}
func (w *LocalWorker) Run(ctx context.Context, task TaskType, si SectorInfo) (SectorInfo, error) {
if si.ID.Miner != w.mid {
return si, xerrors.Errorf("received a task with wrong actor id; worker for %d, task for %d", w.mid, si.ID.Miner)
}
switch task {
case TTPreCommit1:
pco, err := w.sealer.SealPreCommit1(ctx, si.ID.Number, si.Ticket.Value, si.Pieces)
if err != nil {
return si, xerrors.Errorf("calling sealer: %w", err)
}
si.PreCommit1Out = pco
case TTPreCommit2:
sealed, unsealed, err := w.sealer.SealPreCommit2(ctx, si.ID.Number, si.PreCommit1Out)
if err != nil {
return si, xerrors.Errorf("calling sealer (precommit2): %w", err)
}
si.Sealed = &sealed
si.Unsealed = &unsealed
// We also call Commit1 here as it only grabs some inputs for the snark,
// which is very fast (<1s), and it doesn't really make sense to have a separate
// task type for it
c2in, err := w.sealer.SealCommit1(ctx, si.ID.Number, si.Ticket.Value, si.Seed.Value, si.Pieces, *si.Sealed, *si.Unsealed)
if err != nil {
return si, xerrors.Errorf("calling sealer (commit1): %w", err)
}
si.CommitInput = c2in
case TTCommit2:
proof, err := w.sealer.SealCommit2(ctx, si.ID.Number, si.CommitInput)
if err != nil {
return SectorInfo{}, xerrors.Errorf("calling sealer: %w", err)
}
si.Proof = proof
default:
return si, xerrors.Errorf("unknown task type '%s'", task)
}
return si, nil
}
func (w *LocalWorker) AddPiece(ctx context.Context, si SectorInfo, sz abi.UnpaddedPieceSize, r io.Reader) (cid.Cid, SectorInfo, error) {
pi, err := w.sealer.AddPiece(ctx, sz, si.ID.Number, r, si.PieceSizes())
if err != nil {
return cid.Cid{}, SectorInfo{}, xerrors.Errorf("addPiece on local worker: %w", err)
}
si.Pieces = append(si.Pieces, pi)
return pi.PieceCID, si, nil
}
var _ Worker = &LocalWorker{}