lotus/storage/pipeline/receive.go

181 lines
4.7 KiB
Go
Raw Normal View History

2022-08-24 19:27:27 +00:00
package sealing
import (
"context"
2022-08-24 19:27:27 +00:00
"github.com/ipfs/go-datastore"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-statemachine"
2022-08-24 19:27:27 +00:00
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
2022-08-24 19:27:27 +00:00
)
// todo m.inputLk?
2022-08-24 19:27:27 +00:00
func (m *Sealing) Receive(ctx context.Context, meta api.RemoteSectorMeta) error {
si, err := m.checkSectorMeta(ctx, meta)
if err != nil {
2022-08-24 19:27:27 +00:00
return err
}
exists, err := m.sectors.Has(uint64(meta.Sector.Number))
if err != nil {
2022-08-30 19:45:36 +00:00
return xerrors.Errorf("checking if sector exists: %w", err)
}
if exists {
2022-08-30 19:45:36 +00:00
return xerrors.Errorf("sector %d state already exists", meta.Sector.Number)
}
err = m.sectors.Send(uint64(meta.Sector.Number), SectorReceive{
State: si,
})
if err != nil {
return xerrors.Errorf("receiving sector: %w", err)
}
return nil
2022-08-24 19:27:27 +00:00
}
func (m *Sealing) checkSectorMeta(ctx context.Context, meta api.RemoteSectorMeta) (SectorInfo, error) {
2022-08-24 19:27:27 +00:00
{
mid, err := address.IDFromAddress(m.maddr)
if err != nil {
panic(err)
}
if meta.Sector.Miner != abi.ActorID(mid) {
return SectorInfo{}, xerrors.Errorf("sector for wrong actor - expected actor id %d, sector was for actor %d", mid, meta.Sector.Miner)
2022-08-24 19:27:27 +00:00
}
}
{
// initial sanity check, doesn't prevent races
_, err := m.GetSectorInfo(meta.Sector.Number)
if err != nil && !xerrors.Is(err, datastore.ErrNotFound) {
return SectorInfo{}, err
2022-08-24 19:27:27 +00:00
}
if err == nil {
return SectorInfo{}, xerrors.Errorf("sector with ID %d already exists in the sealing pipeline", meta.Sector.Number)
2022-08-24 19:27:27 +00:00
}
}
{
spt, err := m.currentSealProof(ctx)
if err != nil {
return SectorInfo{}, err
2022-08-24 19:27:27 +00:00
}
if meta.Type != spt {
return SectorInfo{}, xerrors.Errorf("sector seal proof type doesn't match current seal proof type (%d!=%d)", meta.Type, spt)
2022-08-24 19:27:27 +00:00
}
}
2022-08-30 19:45:36 +00:00
var info SectorInfo
2022-08-24 19:27:27 +00:00
switch SectorState(meta.State) {
2022-08-30 19:45:36 +00:00
case Proving, Available:
// todo possibly check
info.CommitMessage = meta.CommitMessage
2022-08-24 19:27:27 +00:00
fallthrough
2022-08-30 19:45:36 +00:00
case SubmitCommit:
if meta.PreCommitDeposit == nil {
return SectorInfo{}, xerrors.Errorf("sector PreCommitDeposit was null")
}
2022-08-30 19:45:36 +00:00
info.PreCommitInfo = meta.PreCommitInfo
info.PreCommitDeposit = *meta.PreCommitDeposit
2022-08-30 19:45:36 +00:00
info.PreCommitMessage = meta.PreCommitMessage
info.PreCommitTipSet = meta.PreCommitTipSet
// todo check
info.SeedValue = meta.SeedValue
info.SeedEpoch = meta.SeedEpoch
// todo validate
info.Proof = meta.CommitProof
2022-08-24 19:27:27 +00:00
fallthrough
case PreCommitting:
2022-08-30 19:45:36 +00:00
info.TicketValue = meta.TicketValue
info.TicketEpoch = meta.TicketEpoch
2022-08-24 19:27:27 +00:00
2022-08-30 19:45:36 +00:00
info.PreCommit1Out = meta.PreCommit1Out
info.CommD = meta.CommD // todo check cid prefixes
info.CommR = meta.CommR
2022-08-24 19:27:27 +00:00
if meta.DataSealed == nil {
return SectorInfo{}, xerrors.Errorf("expected DataSealed to be set")
}
if meta.DataCache == nil {
return SectorInfo{}, xerrors.Errorf("expected DataCache to be set")
}
info.RemoteDataSealed = meta.DataSealed
info.RemoteDataCache = meta.DataCache
// If we get a sector after PC2, assume that we're getting finalized sector data
// todo: maybe only set if C1 provider is set?
info.RemoteDataFinalized = true
2022-08-24 19:27:27 +00:00
fallthrough
2022-08-30 19:45:36 +00:00
case GetTicket:
fallthrough
case Packing:
// todo check num free
info.Return = ReturnState(meta.State) // todo dedupe states
info.State = ReceiveSector
2022-08-30 19:45:36 +00:00
info.SectorNumber = meta.Sector.Number
info.Pieces = meta.Pieces
info.SectorType = meta.Type
if err := checkPieces(ctx, m.maddr, meta.Sector.Number, meta.Pieces, m.Api, false); err != nil {
return SectorInfo{}, xerrors.Errorf("checking pieces: %w", err)
2022-08-30 19:45:36 +00:00
}
2022-08-24 19:27:27 +00:00
if meta.DataUnsealed == nil {
return SectorInfo{}, xerrors.Errorf("expected DataUnsealed to be set")
}
info.RemoteDataUnsealed = meta.DataUnsealed
return info, nil
2022-08-24 19:27:27 +00:00
default:
return SectorInfo{}, xerrors.Errorf("imported sector State in not supported")
2022-08-24 19:27:27 +00:00
}
}
func (m *Sealing) handleReceiveSector(ctx statemachine.Context, sector SectorInfo) error {
toFetch := map[storiface.SectorFileType]storiface.SectorData{}
for fileType, data := range map[storiface.SectorFileType]*storiface.SectorData{
storiface.FTUnsealed: sector.RemoteDataUnsealed,
storiface.FTSealed: sector.RemoteDataSealed,
storiface.FTCache: sector.RemoteDataCache,
} {
if data == nil {
continue
}
if data.Local {
// todo check exists
continue
}
toFetch[fileType] = *data
}
if len(toFetch) > 0 {
if err := m.sealer.DownloadSectorData(ctx.Context(), m.minerSector(sector.SectorType, sector.SectorNumber), sector.RemoteDataFinalized, toFetch); err != nil {
return xerrors.Errorf("downloading sector data: %w", err) // todo send err event
}
}
// todo data checks?
return ctx.Send(SectorReceived{})
}