lotus/storage/sealing.go

306 lines
6.6 KiB
Go
Raw Normal View History

2019-10-31 18:46:53 +00:00
package storage
import (
"context"
2019-11-07 18:22:59 +00:00
"io"
2019-11-01 22:05:05 +00:00
2019-10-31 18:46:53 +00:00
cid "github.com/ipfs/go-cid"
2019-11-01 22:05:05 +00:00
xerrors "golang.org/x/xerrors"
2019-11-07 18:22:59 +00:00
"github.com/filecoin-project/lotus/api"
2019-12-02 12:51:16 +00:00
"github.com/filecoin-project/lotus/lib/padreader"
2019-11-07 18:43:15 +00:00
"github.com/filecoin-project/lotus/lib/sectorbuilder"
2019-10-31 18:46:53 +00:00
)
2019-11-07 18:22:59 +00:00
type TicketFn func(context.Context) (*sectorbuilder.SealTicket, error)
2019-11-01 13:58:48 +00:00
type SealTicket struct {
BlockHeight uint64
TicketBytes []byte
}
2019-10-31 18:46:53 +00:00
2019-11-08 18:15:13 +00:00
func (t *SealTicket) SB() sectorbuilder.SealTicket {
2019-11-07 18:22:59 +00:00
out := sectorbuilder.SealTicket{BlockHeight: t.BlockHeight}
copy(out.TicketBytes[:], t.TicketBytes)
return out
}
2019-11-08 18:15:13 +00:00
type SealSeed struct {
BlockHeight uint64
TicketBytes []byte
}
func (t *SealSeed) SB() sectorbuilder.SealSeed {
out := sectorbuilder.SealSeed{BlockHeight: t.BlockHeight}
copy(out.TicketBytes[:], t.TicketBytes)
return out
}
2019-11-07 18:22:59 +00:00
type Piece struct {
DealID uint64
2019-11-07 18:43:15 +00:00
Size uint64
2019-11-07 18:22:59 +00:00
CommP []byte
}
func (p *Piece) ppi() (out sectorbuilder.PublicPieceInfo) {
out.Size = p.Size
copy(out.CommP[:], p.CommP)
return out
}
2019-11-01 13:58:48 +00:00
type SectorInfo struct {
State api.SectorState
SectorID uint64
2019-10-31 18:46:53 +00:00
2019-11-07 18:22:59 +00:00
// Packing
Pieces []Piece
2019-11-01 13:58:48 +00:00
// PreCommit
2019-11-07 18:22:59 +00:00
CommC []byte
CommD []byte
CommR []byte
CommRLast []byte
2019-11-08 18:15:13 +00:00
Proof []byte
2019-11-07 18:43:15 +00:00
Ticket SealTicket
2019-10-31 18:46:53 +00:00
2019-11-01 13:58:48 +00:00
PreCommitMessage *cid.Cid
2019-11-01 13:58:48 +00:00
// PreCommitted
2019-11-08 18:15:13 +00:00
Seed SealSeed
2019-10-31 18:46:53 +00:00
2019-11-01 13:58:48 +00:00
// Committing
CommitMessage *cid.Cid
}
2019-10-31 18:46:53 +00:00
2019-11-01 13:58:48 +00:00
type sectorUpdate struct {
newState api.SectorState
id uint64
err error
mut func(*SectorInfo)
}
2019-10-31 18:46:53 +00:00
2019-11-07 18:22:59 +00:00
func (t *SectorInfo) pieceInfos() []sectorbuilder.PublicPieceInfo {
out := make([]sectorbuilder.PublicPieceInfo, len(t.Pieces))
for i, piece := range t.Pieces {
out[i] = piece.ppi()
}
return out
}
func (t *SectorInfo) deals() []uint64 {
out := make([]uint64, len(t.Pieces))
for i, piece := range t.Pieces {
out[i] = piece.DealID
}
return out
}
func (t *SectorInfo) existingPieces() []uint64 {
out := make([]uint64, len(t.Pieces))
for i, piece := range t.Pieces {
out[i] = piece.Size
}
return out
}
2019-11-07 18:22:59 +00:00
func (t *SectorInfo) rspco() sectorbuilder.RawSealPreCommitOutput {
var out sectorbuilder.RawSealPreCommitOutput
copy(out.CommC[:], t.CommC)
copy(out.CommD[:], t.CommD)
copy(out.CommR[:], t.CommR)
copy(out.CommRLast[:], t.CommRLast)
return out
}
2019-11-17 12:12:05 +00:00
func (m *Miner) sectorStateLoop(ctx context.Context) error {
trackedSectors, err := m.ListSectors()
2019-11-17 12:12:05 +00:00
if err != nil {
return err
}
go func() {
for _, si := range trackedSectors {
2019-11-17 12:12:05 +00:00
select {
case m.sectorUpdated <- sectorUpdate{
newState: si.State,
id: si.SectorID,
err: nil,
mut: nil,
}:
case <-ctx.Done():
log.Warn("didn't restart processing for all sectors: ", ctx.Err())
return
}
}
}()
2019-10-31 18:46:53 +00:00
{
// verify on-chain state
trackedByID := map[uint64]*SectorInfo{}
for _, si := range trackedSectors {
2019-12-02 12:51:16 +00:00
i := si
trackedByID[si.SectorID] = &i
}
curTs, err := m.api.ChainHead(ctx)
if err != nil {
return xerrors.Errorf("getting chain head: %w", err)
}
ps, err := m.api.StateMinerProvingSet(ctx, m.maddr, curTs)
if err != nil {
return err
}
for _, ocs := range ps {
if _, ok := trackedByID[ocs.SectorID]; ok {
continue // TODO: check state
}
// TODO: attempt recovery
log.Warnf("untracked sector %d found on chain", ocs.SectorID)
}
}
2019-11-01 13:58:48 +00:00
go func() {
defer log.Warn("quitting deal provider loop")
defer close(m.stopped)
for {
select {
case sector := <-m.sectorIncoming:
m.onSectorIncoming(sector)
case update := <-m.sectorUpdated:
m.onSectorUpdated(ctx, update)
case <-m.stop:
return
}
}
}()
2019-11-17 12:12:05 +00:00
return nil
2019-11-01 13:58:48 +00:00
}
2019-10-31 18:46:53 +00:00
2019-11-01 13:58:48 +00:00
func (m *Miner) onSectorIncoming(sector *SectorInfo) {
2019-11-07 14:45:53 +00:00
has, err := m.sectors.Has(sector.SectorID)
if err != nil {
return
}
if has {
2019-11-07 18:22:59 +00:00
log.Warnf("SealPiece called more than once for sector %d", sector.SectorID)
2019-11-07 14:45:53 +00:00
return
}
2019-11-01 13:58:48 +00:00
if err := m.sectors.Begin(sector.SectorID, sector); err != nil {
// We may have re-sent the proposal
log.Errorf("deal tracking failed: %s", err)
m.failSector(sector.SectorID, err)
return
}
2019-10-31 18:46:53 +00:00
2019-11-01 13:58:48 +00:00
go func() {
2019-11-01 22:05:05 +00:00
select {
case m.sectorUpdated <- sectorUpdate{
2019-11-06 23:09:48 +00:00
newState: api.Packing,
2019-11-01 13:58:48 +00:00
id: sector.SectorID,
2019-11-01 22:05:05 +00:00
}:
case <-m.stop:
log.Warn("failed to send incoming sector update, miner shutting down")
2019-11-01 13:58:48 +00:00
}
}()
2019-10-31 18:46:53 +00:00
}
2019-11-01 13:58:48 +00:00
func (m *Miner) onSectorUpdated(ctx context.Context, update sectorUpdate) {
2019-11-01 22:44:55 +00:00
log.Infof("Sector %d updated state to %s", update.id, api.SectorStateStr(update.newState))
2019-11-01 13:58:48 +00:00
var sector SectorInfo
err := m.sectors.Mutate(update.id, func(s *SectorInfo) error {
s.State = update.newState
2019-11-01 22:44:55 +00:00
if update.mut != nil {
update.mut(s)
}
2019-11-01 13:58:48 +00:00
sector = *s
return nil
})
if update.err != nil {
2019-11-01 22:44:55 +00:00
log.Errorf("sector %d failed: %s", update.id, update.err)
2019-11-01 13:58:48 +00:00
m.failSector(update.id, update.err)
2019-10-31 18:46:53 +00:00
return
}
2019-11-01 13:58:48 +00:00
if err != nil {
m.failSector(update.id, err)
return
}
2019-11-01 13:58:48 +00:00
switch update.newState {
2019-11-06 23:09:48 +00:00
case api.Packing:
m.handleSectorUpdate(ctx, sector, m.finishPacking, api.Unsealed)
2019-11-01 13:58:48 +00:00
case api.Unsealed:
m.handleSectorUpdate(ctx, sector, m.sealPreCommit, api.PreCommitting)
2019-11-01 13:58:48 +00:00
case api.PreCommitting:
m.handleSectorUpdate(ctx, sector, m.preCommit, api.PreCommitted)
2019-11-01 13:58:48 +00:00
case api.PreCommitted:
m.handleSectorUpdate(ctx, sector, m.preCommitted, api.SectorNoUpdate)
2019-11-01 13:58:48 +00:00
case api.Committing:
m.handleSectorUpdate(ctx, sector, m.committing, api.Proving)
case api.Proving:
// TODO: track sector health / expiration
log.Infof("Proving sector %d", update.id)
case api.SectorNoUpdate: // noop
default:
log.Errorf("unexpected sector update state: %d", update.newState)
2019-10-31 18:46:53 +00:00
}
}
2019-11-01 13:58:48 +00:00
func (m *Miner) failSector(id uint64, err error) {
2019-11-06 12:04:33 +00:00
log.Errorf("sector %d error: %+v", id, err)
2019-11-01 13:58:48 +00:00
}
2019-10-31 18:46:53 +00:00
2019-12-01 17:58:31 +00:00
func (m *Miner) AllocatePiece(size uint64) (sectorID uint64, offset uint64, err error) {
if padreader.PaddedSize(size) != size {
return 0, 0, xerrors.Errorf("cannot allocate unpadded piece")
}
2019-11-01 22:44:55 +00:00
2019-11-07 18:22:59 +00:00
sid, err := m.sb.AcquireSectorId() // TODO: Put more than one thing in a sector
if err != nil {
2019-12-01 17:58:31 +00:00
return 0, 0, xerrors.Errorf("acquiring sector ID: %w", err)
2019-11-07 18:22:59 +00:00
}
2019-12-01 17:58:31 +00:00
// offset hard-coded to 0 since we only put one thing in a sector for now
return sid, 0, nil
}
func (m *Miner) SealPiece(ctx context.Context, size uint64, r io.Reader, sectorID uint64, dealID uint64) error {
log.Infof("Seal piece for deal %d", dealID)
ppi, err := m.sb.AddPiece(size, sectorID, r, []uint64{})
2019-11-07 18:22:59 +00:00
if err != nil {
2019-12-01 17:58:31 +00:00
return xerrors.Errorf("adding piece to sector: %w", err)
2019-11-07 18:22:59 +00:00
}
2019-12-01 17:58:31 +00:00
return m.newSector(ctx, sectorID, dealID, ppi)
2019-11-07 18:22:59 +00:00
}
2019-12-01 17:58:31 +00:00
func (m *Miner) newSector(ctx context.Context, sid uint64, dealID uint64, ppi sectorbuilder.PublicPieceInfo) error {
2019-11-01 22:05:05 +00:00
si := &SectorInfo{
2019-11-01 13:58:48 +00:00
SectorID: sid,
2019-11-07 18:22:59 +00:00
Pieces: []Piece{
{
DealID: dealID,
2019-11-07 18:43:15 +00:00
Size: ppi.Size,
CommP: ppi.CommP[:],
2019-11-07 18:22:59 +00:00
},
},
2019-11-01 22:05:05 +00:00
}
select {
case m.sectorIncoming <- si:
2019-11-01 13:58:48 +00:00
return nil
case <-ctx.Done():
2019-11-01 22:05:05 +00:00
return xerrors.Errorf("failed to submit sector for sealing, queue full: %w", ctx.Err())
2019-11-01 13:58:48 +00:00
}
2019-10-31 18:46:53 +00:00
}