lotus/storage/sealing.go

129 lines
2.7 KiB
Go
Raw Normal View History

2019-10-31 18:46:53 +00:00
package storage
import (
"context"
2019-11-01 22:05:05 +00:00
2019-11-01 13:58:48 +00:00
"github.com/filecoin-project/lotus/api"
2019-10-31 18:46:53 +00:00
"github.com/filecoin-project/lotus/chain/types"
cid "github.com/ipfs/go-cid"
2019-11-01 22:05:05 +00:00
xerrors "golang.org/x/xerrors"
2019-10-31 18:46:53 +00:00
)
2019-11-01 13:58:48 +00:00
type SealTicket struct {
BlockHeight uint64
TicketBytes []byte
}
2019-10-31 18:46:53 +00:00
2019-11-01 13:58:48 +00:00
type SectorInfo struct {
State api.SectorState
SectorID uint64
2019-10-31 18:46:53 +00:00
2019-11-01 13:58:48 +00:00
// PreCommit
CommD []byte
CommR []byte
Ticket SealTicket
2019-10-31 18:46:53 +00:00
2019-11-01 13:58:48 +00:00
PreCommitMessage *cid.Cid
2019-11-01 13:58:48 +00:00
// PreCommitted
RandHeight uint64
RandTs *types.TipSet
2019-10-31 18:46:53 +00:00
2019-11-01 13:58:48 +00:00
// Committing
CommitMessage *cid.Cid
}
2019-10-31 18:46:53 +00:00
2019-11-01 13:58:48 +00:00
type sectorUpdate struct {
newState api.SectorState
id uint64
err error
mut func(*SectorInfo)
}
2019-10-31 18:46:53 +00:00
2019-11-01 13:58:48 +00:00
func (m *Miner) sectorStateLoop(ctx context.Context) {
// TODO: restore state
2019-10-31 18:46:53 +00:00
2019-11-01 13:58:48 +00:00
go func() {
defer log.Warn("quitting deal provider loop")
defer close(m.stopped)
for {
select {
case sector := <-m.sectorIncoming:
m.onSectorIncoming(sector)
case update := <-m.sectorUpdated:
m.onSectorUpdated(ctx, update)
case <-m.stop:
return
}
}
}()
}
2019-10-31 18:46:53 +00:00
2019-11-01 13:58:48 +00:00
func (m *Miner) onSectorIncoming(sector *SectorInfo) {
if err := m.sectors.Begin(sector.SectorID, sector); err != nil {
// We may have re-sent the proposal
log.Errorf("deal tracking failed: %s", err)
m.failSector(sector.SectorID, err)
return
}
2019-10-31 18:46:53 +00:00
2019-11-01 13:58:48 +00:00
go func() {
2019-11-01 22:05:05 +00:00
select {
case m.sectorUpdated <- sectorUpdate{
2019-11-01 13:58:48 +00:00
newState: api.Unsealed,
id: sector.SectorID,
2019-11-01 22:05:05 +00:00
}:
case <-m.stop:
log.Warn("failed to send incoming sector update, miner shutting down")
2019-11-01 13:58:48 +00:00
}
}()
2019-10-31 18:46:53 +00:00
}
2019-11-01 13:58:48 +00:00
func (m *Miner) onSectorUpdated(ctx context.Context, update sectorUpdate) {
log.Infof("Sector %s updated state to %s", update.id, api.SectorStateStr(update.newState))
var sector SectorInfo
err := m.sectors.Mutate(update.id, func(s *SectorInfo) error {
s.State = update.newState
sector = *s
return nil
})
if update.err != nil {
log.Errorf("deal %s failed: %s", update.id, update.err)
m.failSector(update.id, update.err)
2019-10-31 18:46:53 +00:00
return
}
2019-11-01 13:58:48 +00:00
if err != nil {
m.failSector(update.id, err)
return
}
2019-11-01 13:58:48 +00:00
switch update.newState {
case api.Unsealed:
m.handle(ctx, sector, m.sealPreCommit, api.PreCommitting)
case api.PreCommitting:
m.handle(ctx, sector, m.preCommit, api.PreCommitted)
case api.PreCommitted:
m.handle(ctx, sector, m.preCommitted, api.SectorNoUpdate)
case api.Committing:
m.handle(ctx, sector, m.committing, api.Proving)
2019-10-31 18:46:53 +00:00
}
}
2019-11-01 13:58:48 +00:00
func (m *Miner) failSector(id uint64, err error) {
panic(err) // todo: better error handling strategy
}
2019-10-31 18:46:53 +00:00
2019-11-01 13:58:48 +00:00
func (m *Miner) SealSector(ctx context.Context, sid uint64) error {
2019-11-01 22:05:05 +00:00
si := &SectorInfo{
2019-11-01 13:58:48 +00:00
State: api.UndefinedSectorState,
SectorID: sid,
2019-11-01 22:05:05 +00:00
}
select {
case m.sectorIncoming <- si:
2019-11-01 13:58:48 +00:00
return nil
case <-ctx.Done():
2019-11-01 22:05:05 +00:00
return xerrors.Errorf("failed to submit sector for sealing, queue full: %w", ctx.Err())
2019-11-01 13:58:48 +00:00
}
2019-10-31 18:46:53 +00:00
}