Move miner sealing logic into a separate package
This commit is contained in:
commit
03f31d74f1
1092
cbor_gen.go
Normal file
1092
cbor_gen.go
Normal file
File diff suppressed because it is too large
Load Diff
127
garbage.go
Normal file
127
garbage.go
Normal file
@ -0,0 +1,127 @@
|
|||||||
|
package sealing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
"math"
|
||||||
|
"math/rand"
|
||||||
|
|
||||||
|
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/chain/actors"
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (m *Sealing) pledgeSector(ctx context.Context, sectorID uint64, existingPieceSizes []uint64, sizes ...uint64) ([]Piece, error) {
|
||||||
|
if len(sizes) == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
deals := make([]actors.StorageDealProposal, len(sizes))
|
||||||
|
for i, size := range sizes {
|
||||||
|
release := m.sb.RateLimit()
|
||||||
|
commP, err := sectorbuilder.GeneratePieceCommitment(io.LimitReader(rand.New(rand.NewSource(42)), int64(size)), size)
|
||||||
|
release()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
sdp := actors.StorageDealProposal{
|
||||||
|
PieceRef: commP[:],
|
||||||
|
PieceSize: size,
|
||||||
|
Client: m.worker,
|
||||||
|
Provider: m.maddr,
|
||||||
|
ProposalExpiration: math.MaxUint64,
|
||||||
|
Duration: math.MaxUint64 / 2, // /2 because overflows
|
||||||
|
StoragePricePerEpoch: types.NewInt(0),
|
||||||
|
StorageCollateral: types.NewInt(0),
|
||||||
|
ProposerSignature: nil, // nil because self dealing
|
||||||
|
}
|
||||||
|
|
||||||
|
deals[i] = sdp
|
||||||
|
}
|
||||||
|
|
||||||
|
params, aerr := actors.SerializeParams(&actors.PublishStorageDealsParams{
|
||||||
|
Deals: deals,
|
||||||
|
})
|
||||||
|
if aerr != nil {
|
||||||
|
return nil, xerrors.Errorf("serializing PublishStorageDeals params failed: ", aerr)
|
||||||
|
}
|
||||||
|
|
||||||
|
smsg, err := m.api.MpoolPushMessage(ctx, &types.Message{
|
||||||
|
To: actors.StorageMarketAddress,
|
||||||
|
From: m.worker,
|
||||||
|
Value: types.NewInt(0),
|
||||||
|
GasPrice: types.NewInt(0),
|
||||||
|
GasLimit: types.NewInt(1000000),
|
||||||
|
Method: actors.SMAMethods.PublishStorageDeals,
|
||||||
|
Params: params,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
r, err := m.api.StateWaitMsg(ctx, smsg.Cid())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if r.Receipt.ExitCode != 0 {
|
||||||
|
log.Error(xerrors.Errorf("publishing deal failed: exit %d", r.Receipt.ExitCode))
|
||||||
|
}
|
||||||
|
var resp actors.PublishStorageDealResponse
|
||||||
|
if err := resp.UnmarshalCBOR(bytes.NewReader(r.Receipt.Return)); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if len(resp.DealIDs) != len(sizes) {
|
||||||
|
return nil, xerrors.New("got unexpected number of DealIDs from PublishStorageDeals")
|
||||||
|
}
|
||||||
|
|
||||||
|
out := make([]Piece, len(sizes))
|
||||||
|
|
||||||
|
for i, size := range sizes {
|
||||||
|
ppi, err := m.sb.AddPiece(size, sectorID, io.LimitReader(rand.New(rand.NewSource(42)), int64(size)), existingPieceSizes)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
existingPieceSizes = append(existingPieceSizes, size)
|
||||||
|
|
||||||
|
out[i] = Piece{
|
||||||
|
DealID: resp.DealIDs[i],
|
||||||
|
Size: ppi.Size,
|
||||||
|
CommP: ppi.CommP[:],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) PledgeSector() error {
|
||||||
|
go func() {
|
||||||
|
ctx := context.TODO() // we can't use the context from command which invokes
|
||||||
|
// this, as we run everything here async, and it's cancelled when the
|
||||||
|
// command exits
|
||||||
|
|
||||||
|
size := sectorbuilder.UserBytesForSectorSize(m.sb.SectorSize())
|
||||||
|
|
||||||
|
sid, err := m.sb.AcquireSectorId()
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("%+v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
pieces, err := m.pledgeSector(ctx, sid, []uint64{}, size)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("%+v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := m.newSector(context.TODO(), sid, pieces[0].DealID, pieces[0].ppi()); err != nil {
|
||||||
|
log.Errorf("%+v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return nil
|
||||||
|
}
|
264
sector_fsm.go
Normal file
264
sector_fsm.go
Normal file
@ -0,0 +1,264 @@
|
|||||||
|
package sealing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/api"
|
||||||
|
"github.com/filecoin-project/lotus/lib/statemachine"
|
||||||
|
)
|
||||||
|
|
||||||
|
type SectorStart struct {
|
||||||
|
id uint64
|
||||||
|
pieces []Piece
|
||||||
|
}
|
||||||
|
type SectorRestart struct{}
|
||||||
|
|
||||||
|
type SectorFatalError struct{ error }
|
||||||
|
|
||||||
|
type SectorPacked struct{ pieces []Piece }
|
||||||
|
|
||||||
|
type SectorSealed struct {
|
||||||
|
commR []byte
|
||||||
|
commD []byte
|
||||||
|
ticket SealTicket
|
||||||
|
}
|
||||||
|
type SectorSealFailed struct{ error }
|
||||||
|
|
||||||
|
type SectorPreCommitFailed struct{ error }
|
||||||
|
type SectorPreCommitted struct {
|
||||||
|
message cid.Cid
|
||||||
|
}
|
||||||
|
|
||||||
|
type SectorSeedReady struct {
|
||||||
|
seed SealSeed
|
||||||
|
}
|
||||||
|
|
||||||
|
type SectorSealCommitFailed struct{ error }
|
||||||
|
type SectorCommitFailed struct{ error }
|
||||||
|
type SectorCommitted struct {
|
||||||
|
message cid.Cid
|
||||||
|
proof []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
type SectorProving struct{}
|
||||||
|
|
||||||
|
type SectorFaultReported struct{ reportMsg cid.Cid }
|
||||||
|
type SectorFaultedFinal struct{}
|
||||||
|
|
||||||
|
type SectorForceState struct {
|
||||||
|
state api.SectorState
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface{}, error) {
|
||||||
|
next, err := m.plan(events, user.(*SectorInfo))
|
||||||
|
if err != nil || next == nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return func(ctx statemachine.Context, si SectorInfo) error {
|
||||||
|
err := next(ctx, si)
|
||||||
|
if err != nil {
|
||||||
|
if err := ctx.Send(SectorFatalError{error: err}); err != nil {
|
||||||
|
return xerrors.Errorf("error while sending error: reporting %+v: %w", err, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(statemachine.Context, SectorInfo) error, error) {
|
||||||
|
/////
|
||||||
|
// First process all events
|
||||||
|
|
||||||
|
for _, event := range events {
|
||||||
|
if err, ok := event.User.(error); ok {
|
||||||
|
state.LastErr = fmt.Sprintf("%+v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
switch event := event.User.(type) {
|
||||||
|
case SectorStart:
|
||||||
|
// TODO: check if state is clean
|
||||||
|
state.SectorID = event.id
|
||||||
|
state.Pieces = event.pieces
|
||||||
|
state.State = api.Packing
|
||||||
|
|
||||||
|
case SectorRestart:
|
||||||
|
// noop
|
||||||
|
case SectorFatalError:
|
||||||
|
log.Errorf("Fatal error on sector %d: %+v", state.SectorID, event.error)
|
||||||
|
// TODO: Do we want to mark the state as unrecoverable?
|
||||||
|
// I feel like this should be a softer error, where the user would
|
||||||
|
// be able to send a retry event of some kind
|
||||||
|
return nil, nil
|
||||||
|
|
||||||
|
// // TODO: Incoming
|
||||||
|
// TODO: for those - look at dealIDs matching chain
|
||||||
|
|
||||||
|
// //
|
||||||
|
// Packing
|
||||||
|
|
||||||
|
case SectorPacked:
|
||||||
|
// TODO: assert state
|
||||||
|
state.Pieces = append(state.Pieces, event.pieces...)
|
||||||
|
state.State = api.Unsealed
|
||||||
|
|
||||||
|
// // Unsealed
|
||||||
|
|
||||||
|
case SectorSealFailed:
|
||||||
|
// TODO: try to find out the reason, maybe retry
|
||||||
|
state.State = api.SealFailed
|
||||||
|
|
||||||
|
case SectorSealed:
|
||||||
|
state.CommD = event.commD
|
||||||
|
state.CommR = event.commR
|
||||||
|
state.Ticket = event.ticket
|
||||||
|
state.State = api.PreCommitting
|
||||||
|
|
||||||
|
// // PreCommit
|
||||||
|
|
||||||
|
case SectorPreCommitFailed:
|
||||||
|
// TODO: try to find out the reason, maybe retry
|
||||||
|
state.State = api.PreCommitFailed
|
||||||
|
case SectorPreCommitted:
|
||||||
|
state.PreCommitMessage = &event.message
|
||||||
|
state.State = api.PreCommitted
|
||||||
|
|
||||||
|
case SectorSeedReady:
|
||||||
|
state.Seed = event.seed
|
||||||
|
state.State = api.Committing
|
||||||
|
|
||||||
|
// // Commit
|
||||||
|
|
||||||
|
case SectorSealCommitFailed:
|
||||||
|
// TODO: try to find out the reason, maybe retry
|
||||||
|
state.State = api.SealCommitFailed
|
||||||
|
case SectorCommitFailed:
|
||||||
|
// TODO: try to find out the reason, maybe retry
|
||||||
|
state.State = api.SealFailed
|
||||||
|
case SectorCommitted:
|
||||||
|
state.Proof = event.proof
|
||||||
|
state.CommitMessage = &event.message
|
||||||
|
state.State = api.CommitWait
|
||||||
|
case SectorProving:
|
||||||
|
state.State = api.Proving
|
||||||
|
|
||||||
|
case SectorFaultReported:
|
||||||
|
state.FaultReportMsg = &event.reportMsg
|
||||||
|
state.State = api.FaultReported
|
||||||
|
case SectorFaultedFinal:
|
||||||
|
state.State = api.FaultedFinal
|
||||||
|
|
||||||
|
// // Debug triggers
|
||||||
|
case SectorForceState:
|
||||||
|
state.State = event.state
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/////
|
||||||
|
// Now decide what to do next
|
||||||
|
|
||||||
|
/*
|
||||||
|
|
||||||
|
* Empty
|
||||||
|
| |
|
||||||
|
| v
|
||||||
|
*<- Packing <- incoming
|
||||||
|
| |
|
||||||
|
| v
|
||||||
|
*<- Unsealed <--> SealFailed
|
||||||
|
| |
|
||||||
|
| v
|
||||||
|
* PreCommitting <--> PreCommitFailed
|
||||||
|
| | ^
|
||||||
|
| v |
|
||||||
|
*<- PreCommitted ------/
|
||||||
|
| |||
|
||||||
|
| vvv v--> SealCommitFailed
|
||||||
|
*<- Committing
|
||||||
|
| | ^--> CommitFailed
|
||||||
|
| v ^
|
||||||
|
*<- CommitWait ---/
|
||||||
|
| |
|
||||||
|
| v
|
||||||
|
*<- Proving
|
||||||
|
|
|
||||||
|
v
|
||||||
|
FailedUnrecoverable
|
||||||
|
|
||||||
|
UndefinedSectorState <- ¯\_(ツ)_/¯
|
||||||
|
| ^
|
||||||
|
*---------------------/
|
||||||
|
|
||||||
|
*/
|
||||||
|
|
||||||
|
switch state.State {
|
||||||
|
// Happy path
|
||||||
|
case api.Packing:
|
||||||
|
return m.handlePacking, nil
|
||||||
|
case api.Unsealed:
|
||||||
|
return m.handleUnsealed, nil
|
||||||
|
case api.PreCommitting:
|
||||||
|
return m.handlePreCommitting, nil
|
||||||
|
case api.PreCommitted:
|
||||||
|
return m.handlePreCommitted, nil
|
||||||
|
case api.Committing:
|
||||||
|
return m.handleCommitting, nil
|
||||||
|
case api.CommitWait:
|
||||||
|
return m.handleCommitWait, nil
|
||||||
|
case api.Proving:
|
||||||
|
// TODO: track sector health / expiration
|
||||||
|
log.Infof("Proving sector %d", state.SectorID)
|
||||||
|
|
||||||
|
// Handled failure modes
|
||||||
|
case api.SealFailed:
|
||||||
|
log.Warnf("sector %d entered unimplemented state 'SealFailed'", state.SectorID)
|
||||||
|
case api.PreCommitFailed:
|
||||||
|
log.Warnf("sector %d entered unimplemented state 'PreCommitFailed'", state.SectorID)
|
||||||
|
case api.SealCommitFailed:
|
||||||
|
log.Warnf("sector %d entered unimplemented state 'SealCommitFailed'", state.SectorID)
|
||||||
|
case api.CommitFailed:
|
||||||
|
log.Warnf("sector %d entered unimplemented state 'CommitFailed'", state.SectorID)
|
||||||
|
|
||||||
|
// Faults
|
||||||
|
case api.Faulty:
|
||||||
|
return m.handleFaulty, nil
|
||||||
|
case api.FaultReported:
|
||||||
|
return m.handleFaultReported, nil
|
||||||
|
|
||||||
|
// Fatal errors
|
||||||
|
case api.UndefinedSectorState:
|
||||||
|
log.Error("sector update with undefined state!")
|
||||||
|
case api.FailedUnrecoverable:
|
||||||
|
log.Errorf("sector %d failed unrecoverably", state.SectorID)
|
||||||
|
default:
|
||||||
|
log.Errorf("unexpected sector update state: %d", state.State)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) restartSectors(ctx context.Context) error {
|
||||||
|
trackedSectors, err := m.ListSectors()
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("loading sector list: %+v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, sector := range trackedSectors {
|
||||||
|
if err := m.sectors.Send(sector.SectorID, SectorRestart{}); err != nil {
|
||||||
|
log.Errorf("restarting sector %d: %+v", sector.SectorID, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Grab on-chain sector set and diff with trackedSectors
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) ForceSectorState(ctx context.Context, id uint64, state api.SectorState) error {
|
||||||
|
return m.sectors.Send(id, SectorForceState{state})
|
||||||
|
}
|
254
sector_states.go
Normal file
254
sector_states.go
Normal file
@ -0,0 +1,254 @@
|
|||||||
|
package sealing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/build"
|
||||||
|
"github.com/filecoin-project/lotus/chain/actors"
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
"github.com/filecoin-project/lotus/lib/statemachine"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
|
log.Infow("performing filling up rest of the sector...", "sector", sector.SectorID)
|
||||||
|
|
||||||
|
var allocated uint64
|
||||||
|
for _, piece := range sector.Pieces {
|
||||||
|
allocated += piece.Size
|
||||||
|
}
|
||||||
|
|
||||||
|
ubytes := sectorbuilder.UserBytesForSectorSize(m.sb.SectorSize())
|
||||||
|
|
||||||
|
if allocated > ubytes {
|
||||||
|
return xerrors.Errorf("too much data in sector: %d > %d", allocated, ubytes)
|
||||||
|
}
|
||||||
|
|
||||||
|
fillerSizes, err := fillersFromRem(ubytes - allocated)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(fillerSizes) > 0 {
|
||||||
|
log.Warnf("Creating %d filler pieces for sector %d", len(fillerSizes), sector.SectorID)
|
||||||
|
}
|
||||||
|
|
||||||
|
pieces, err := m.pledgeSector(ctx.Context(), sector.SectorID, sector.existingPieces(), fillerSizes...)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return ctx.Send(SectorPacked{pieces: pieces})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
|
log.Infow("performing sector replication...", "sector", sector.SectorID)
|
||||||
|
ticket, err := m.tktFn(ctx.Context())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
rspco, err := m.sb.SealPreCommit(ctx.Context(), sector.SectorID, *ticket, sector.pieceInfos())
|
||||||
|
if err != nil {
|
||||||
|
return ctx.Send(SectorSealFailed{xerrors.Errorf("seal pre commit failed: %w", err)})
|
||||||
|
}
|
||||||
|
|
||||||
|
return ctx.Send(SectorSealed{
|
||||||
|
commD: rspco.CommD[:],
|
||||||
|
commR: rspco.CommR[:],
|
||||||
|
ticket: SealTicket{
|
||||||
|
BlockHeight: ticket.BlockHeight,
|
||||||
|
TicketBytes: ticket.TicketBytes[:],
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
|
params := &actors.SectorPreCommitInfo{
|
||||||
|
SectorNumber: sector.SectorID,
|
||||||
|
|
||||||
|
CommR: sector.CommR,
|
||||||
|
SealEpoch: sector.Ticket.BlockHeight,
|
||||||
|
DealIDs: sector.deals(),
|
||||||
|
}
|
||||||
|
enc, aerr := actors.SerializeParams(params)
|
||||||
|
if aerr != nil {
|
||||||
|
return ctx.Send(SectorPreCommitFailed{xerrors.Errorf("could not serialize commit sector parameters: %w", aerr)})
|
||||||
|
}
|
||||||
|
|
||||||
|
msg := &types.Message{
|
||||||
|
To: m.maddr,
|
||||||
|
From: m.worker,
|
||||||
|
Method: actors.MAMethods.PreCommitSector,
|
||||||
|
Params: enc,
|
||||||
|
Value: types.NewInt(0), // TODO: need to ensure sufficient collateral
|
||||||
|
GasLimit: types.NewInt(1000000 /* i dont know help */),
|
||||||
|
GasPrice: types.NewInt(1),
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("submitting precommit for sector: ", sector.SectorID)
|
||||||
|
smsg, err := m.api.MpoolPushMessage(ctx.Context(), msg)
|
||||||
|
if err != nil {
|
||||||
|
return ctx.Send(SectorPreCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)})
|
||||||
|
}
|
||||||
|
|
||||||
|
return ctx.Send(SectorPreCommitted{message: smsg.Cid()})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) handlePreCommitted(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
|
// would be ideal to just use the events.Called handler, but it wouldnt be able to handle individual message timeouts
|
||||||
|
log.Info("Sector precommitted: ", sector.SectorID)
|
||||||
|
mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.PreCommitMessage)
|
||||||
|
if err != nil {
|
||||||
|
return ctx.Send(SectorPreCommitFailed{err})
|
||||||
|
}
|
||||||
|
|
||||||
|
if mw.Receipt.ExitCode != 0 {
|
||||||
|
log.Error("sector precommit failed: ", mw.Receipt.ExitCode)
|
||||||
|
err := xerrors.Errorf("sector precommit failed: %d", mw.Receipt.ExitCode)
|
||||||
|
return ctx.Send(SectorPreCommitFailed{err})
|
||||||
|
}
|
||||||
|
log.Info("precommit message landed on chain: ", sector.SectorID)
|
||||||
|
|
||||||
|
randHeight := mw.TipSet.Height() + build.InteractivePoRepDelay - 1 // -1 because of how the messages are applied
|
||||||
|
log.Infof("precommit for sector %d made it on chain, will start proof computation at height %d", sector.SectorID, randHeight)
|
||||||
|
|
||||||
|
err = m.events.ChainAt(func(ectx context.Context, ts *types.TipSet, curH uint64) error {
|
||||||
|
rand, err := m.api.ChainGetRandomness(ectx, ts.Key(), int64(randHeight))
|
||||||
|
if err != nil {
|
||||||
|
err = xerrors.Errorf("failed to get randomness for computing seal proof: %w", err)
|
||||||
|
|
||||||
|
ctx.Send(SectorFatalError{error: err})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx.Send(SectorSeedReady{seed: SealSeed{
|
||||||
|
BlockHeight: randHeight,
|
||||||
|
TicketBytes: rand,
|
||||||
|
}})
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}, func(ctx context.Context, ts *types.TipSet) error {
|
||||||
|
log.Warn("revert in interactive commit sector step")
|
||||||
|
// TODO: need to cancel running process and restart...
|
||||||
|
return nil
|
||||||
|
}, build.InteractivePoRepConfidence, mw.TipSet.Height()+build.InteractivePoRepDelay)
|
||||||
|
if err != nil {
|
||||||
|
log.Warn("waitForPreCommitMessage ChainAt errored: ", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
|
log.Info("scheduling seal proof computation...")
|
||||||
|
|
||||||
|
proof, err := m.sb.SealCommit(ctx.Context(), sector.SectorID, sector.Ticket.SB(), sector.Seed.SB(), sector.pieceInfos(), sector.rspco())
|
||||||
|
if err != nil {
|
||||||
|
return ctx.Send(SectorSealCommitFailed{xerrors.Errorf("computing seal proof failed: %w", err)})
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Consider splitting states and persist proof for faster recovery
|
||||||
|
|
||||||
|
params := &actors.SectorProveCommitInfo{
|
||||||
|
Proof: proof,
|
||||||
|
SectorID: sector.SectorID,
|
||||||
|
DealIDs: sector.deals(),
|
||||||
|
}
|
||||||
|
|
||||||
|
enc, aerr := actors.SerializeParams(params)
|
||||||
|
if aerr != nil {
|
||||||
|
return ctx.Send(SectorCommitFailed{xerrors.Errorf("could not serialize commit sector parameters: %w", aerr)})
|
||||||
|
}
|
||||||
|
|
||||||
|
msg := &types.Message{
|
||||||
|
To: m.maddr,
|
||||||
|
From: m.worker,
|
||||||
|
Method: actors.MAMethods.ProveCommitSector,
|
||||||
|
Params: enc,
|
||||||
|
Value: types.NewInt(0), // TODO: need to ensure sufficient collateral
|
||||||
|
GasLimit: types.NewInt(1000000 /* i dont know help */),
|
||||||
|
GasPrice: types.NewInt(1),
|
||||||
|
}
|
||||||
|
|
||||||
|
smsg, err := m.api.MpoolPushMessage(ctx.Context(), msg)
|
||||||
|
if err != nil {
|
||||||
|
return ctx.Send(SectorCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)})
|
||||||
|
}
|
||||||
|
|
||||||
|
return ctx.Send(SectorCommitted{
|
||||||
|
proof: proof,
|
||||||
|
message: smsg.Cid(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
|
if sector.CommitMessage == nil {
|
||||||
|
log.Errorf("sector %d entered commit wait state without a message cid", sector.SectorID)
|
||||||
|
return ctx.Send(SectorCommitFailed{xerrors.Errorf("entered commit wait with no commit cid")})
|
||||||
|
}
|
||||||
|
|
||||||
|
mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.CommitMessage)
|
||||||
|
if err != nil {
|
||||||
|
return ctx.Send(SectorCommitFailed{xerrors.Errorf("failed to wait for porep inclusion: %w", err)})
|
||||||
|
}
|
||||||
|
|
||||||
|
if mw.Receipt.ExitCode != 0 {
|
||||||
|
log.Errorf("UNHANDLED: submitting sector proof failed (exit=%d, msg=%s) (t:%x; s:%x(%d); p:%x)", mw.Receipt.ExitCode, sector.CommitMessage, sector.Ticket.TicketBytes, sector.Seed.TicketBytes, sector.Seed.BlockHeight, sector.Proof)
|
||||||
|
return xerrors.Errorf("UNHANDLED: submitting sector proof failed (exit: %d)", mw.Receipt.ExitCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
return ctx.Send(SectorProving{})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) handleFaulty(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
|
// TODO: check if the fault has already been reported, and that this sector is even valid
|
||||||
|
|
||||||
|
// TODO: coalesce faulty sector reporting
|
||||||
|
bf := types.NewBitField()
|
||||||
|
bf.Set(sector.SectorID)
|
||||||
|
|
||||||
|
fp := &actors.DeclareFaultsParams{bf}
|
||||||
|
_ = fp
|
||||||
|
enc, aerr := actors.SerializeParams(nil)
|
||||||
|
if aerr != nil {
|
||||||
|
return xerrors.Errorf("failed to serialize declare fault params: %w", aerr)
|
||||||
|
}
|
||||||
|
|
||||||
|
msg := &types.Message{
|
||||||
|
To: m.maddr,
|
||||||
|
From: m.worker,
|
||||||
|
Method: actors.MAMethods.DeclareFaults,
|
||||||
|
Params: enc,
|
||||||
|
Value: types.NewInt(0), // TODO: need to ensure sufficient collateral
|
||||||
|
GasLimit: types.NewInt(1000000 /* i dont know help */),
|
||||||
|
GasPrice: types.NewInt(1),
|
||||||
|
}
|
||||||
|
|
||||||
|
smsg, err := m.api.MpoolPushMessage(ctx.Context(), msg)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("failed to push declare faults message to network: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return ctx.Send(SectorFaultReported{reportMsg: smsg.Cid()})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) handleFaultReported(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
|
if sector.FaultReportMsg == nil {
|
||||||
|
return xerrors.Errorf("entered fault reported state without a FaultReportMsg cid")
|
||||||
|
}
|
||||||
|
|
||||||
|
mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.FaultReportMsg)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("failed to wait for fault declaration: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if mw.Receipt.ExitCode != 0 {
|
||||||
|
log.Errorf("UNHANDLED: declaring sector fault failed (exit=%d, msg=%s) (id: %d)", mw.Receipt.ExitCode, *sector.FaultReportMsg, sector.SectorID)
|
||||||
|
return xerrors.Errorf("UNHANDLED: submitting fault declaration failed (exit %d)", mw.Receipt.ExitCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
return ctx.Send(SectorFaultedFinal{})
|
||||||
|
}
|
106
sector_types.go
Normal file
106
sector_types.go
Normal file
@ -0,0 +1,106 @@
|
|||||||
|
package sealing
|
||||||
|
|
||||||
|
import (
|
||||||
|
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/api"
|
||||||
|
)
|
||||||
|
|
||||||
|
type SealTicket struct {
|
||||||
|
BlockHeight uint64
|
||||||
|
TicketBytes []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *SealTicket) SB() sectorbuilder.SealTicket {
|
||||||
|
out := sectorbuilder.SealTicket{BlockHeight: t.BlockHeight}
|
||||||
|
copy(out.TicketBytes[:], t.TicketBytes)
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
type SealSeed struct {
|
||||||
|
BlockHeight uint64
|
||||||
|
TicketBytes []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *SealSeed) SB() sectorbuilder.SealSeed {
|
||||||
|
out := sectorbuilder.SealSeed{BlockHeight: t.BlockHeight}
|
||||||
|
copy(out.TicketBytes[:], t.TicketBytes)
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
type Piece struct {
|
||||||
|
DealID uint64
|
||||||
|
|
||||||
|
Size uint64
|
||||||
|
CommP []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Piece) ppi() (out sectorbuilder.PublicPieceInfo) {
|
||||||
|
out.Size = p.Size
|
||||||
|
copy(out.CommP[:], p.CommP)
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
type SectorInfo struct {
|
||||||
|
State api.SectorState
|
||||||
|
SectorID uint64
|
||||||
|
Nonce uint64
|
||||||
|
|
||||||
|
// Packing
|
||||||
|
|
||||||
|
Pieces []Piece
|
||||||
|
|
||||||
|
// PreCommit
|
||||||
|
CommD []byte
|
||||||
|
CommR []byte
|
||||||
|
Proof []byte
|
||||||
|
Ticket SealTicket
|
||||||
|
|
||||||
|
PreCommitMessage *cid.Cid
|
||||||
|
|
||||||
|
// PreCommitted
|
||||||
|
Seed SealSeed
|
||||||
|
|
||||||
|
// Committing
|
||||||
|
CommitMessage *cid.Cid
|
||||||
|
|
||||||
|
// Faults
|
||||||
|
FaultReportMsg *cid.Cid
|
||||||
|
|
||||||
|
// Debug
|
||||||
|
LastErr string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *SectorInfo) pieceInfos() []sectorbuilder.PublicPieceInfo {
|
||||||
|
out := make([]sectorbuilder.PublicPieceInfo, len(t.Pieces))
|
||||||
|
for i, piece := range t.Pieces {
|
||||||
|
out[i] = piece.ppi()
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *SectorInfo) deals() []uint64 {
|
||||||
|
out := make([]uint64, len(t.Pieces))
|
||||||
|
for i, piece := range t.Pieces {
|
||||||
|
out[i] = piece.DealID
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *SectorInfo) existingPieces() []uint64 {
|
||||||
|
out := make([]uint64, len(t.Pieces))
|
||||||
|
for i, piece := range t.Pieces {
|
||||||
|
out[i] = piece.Size
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *SectorInfo) rspco() sectorbuilder.RawSealPreCommitOutput {
|
||||||
|
var out sectorbuilder.RawSealPreCommitOutput
|
||||||
|
|
||||||
|
copy(out.CommD[:], t.CommD)
|
||||||
|
copy(out.CommR[:], t.CommR)
|
||||||
|
|
||||||
|
return out
|
||||||
|
}
|
57
sector_utils.go
Normal file
57
sector_utils.go
Normal file
@ -0,0 +1,57 @@
|
|||||||
|
package sealing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math/bits"
|
||||||
|
|
||||||
|
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
|
||||||
|
)
|
||||||
|
|
||||||
|
func fillersFromRem(toFill uint64) ([]uint64, error) {
|
||||||
|
// Convert to in-sector bytes for easier math:
|
||||||
|
//
|
||||||
|
// Sector size to user bytes ratio is constant, e.g. for 1024B we have 1016B
|
||||||
|
// of user-usable data.
|
||||||
|
//
|
||||||
|
// (1024/1016 = 128/127)
|
||||||
|
//
|
||||||
|
// Given that we can get sector size by simply adding 1/127 of the user
|
||||||
|
// bytes
|
||||||
|
//
|
||||||
|
// (we convert to sector bytes as they are nice round binary numbers)
|
||||||
|
|
||||||
|
toFill += toFill / 127
|
||||||
|
|
||||||
|
// We need to fill the sector with pieces that are powers of 2. Conveniently
|
||||||
|
// computers store numbers in binary, which means we can look at 1s to get
|
||||||
|
// all the piece sizes we need to fill the sector. It also means that number
|
||||||
|
// of pieces is the number of 1s in the number of remaining bytes to fill
|
||||||
|
out := make([]uint64, bits.OnesCount64(toFill))
|
||||||
|
for i := range out {
|
||||||
|
// Extract the next lowest non-zero bit
|
||||||
|
next := bits.TrailingZeros64(toFill)
|
||||||
|
psize := uint64(1) << next
|
||||||
|
// e.g: if the number is 0b010100, psize will be 0b000100
|
||||||
|
|
||||||
|
// set that bit to 0 by XORing it, so the next iteration looks at the
|
||||||
|
// next bit
|
||||||
|
toFill ^= psize
|
||||||
|
|
||||||
|
// Add the piece size to the list of pieces we need to create
|
||||||
|
out[i] = sectorbuilder.UserBytesForSectorSize(psize)
|
||||||
|
}
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) ListSectors() ([]SectorInfo, error) {
|
||||||
|
var sectors []SectorInfo
|
||||||
|
if err := m.sectors.List(§ors); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return sectors, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) GetSectorInfo(sid uint64) (SectorInfo, error) {
|
||||||
|
var out SectorInfo
|
||||||
|
err := m.sectors.Get(sid).Get(&out)
|
||||||
|
return out, err
|
||||||
|
}
|
46
sector_utils_test.go
Normal file
46
sector_utils_test.go
Normal file
@ -0,0 +1,46 @@
|
|||||||
|
package sealing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
|
||||||
|
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
|
||||||
|
)
|
||||||
|
|
||||||
|
func testFill(t *testing.T, n uint64, exp []uint64) {
|
||||||
|
f, err := fillersFromRem(n)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, exp, f)
|
||||||
|
|
||||||
|
var sum uint64
|
||||||
|
for _, u := range f {
|
||||||
|
sum += u
|
||||||
|
}
|
||||||
|
assert.Equal(t, n, sum)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFillersFromRem(t *testing.T) {
|
||||||
|
for i := 8; i < 32; i++ {
|
||||||
|
// single
|
||||||
|
ub := sectorbuilder.UserBytesForSectorSize(uint64(1) << i)
|
||||||
|
testFill(t, ub, []uint64{ub})
|
||||||
|
|
||||||
|
// 2
|
||||||
|
ub = sectorbuilder.UserBytesForSectorSize(uint64(5) << i)
|
||||||
|
ub1 := sectorbuilder.UserBytesForSectorSize(uint64(1) << i)
|
||||||
|
ub3 := sectorbuilder.UserBytesForSectorSize(uint64(4) << i)
|
||||||
|
testFill(t, ub, []uint64{ub1, ub3})
|
||||||
|
|
||||||
|
// 4
|
||||||
|
ub = sectorbuilder.UserBytesForSectorSize(uint64(15) << i)
|
||||||
|
ub2 := sectorbuilder.UserBytesForSectorSize(uint64(2) << i)
|
||||||
|
ub4 := sectorbuilder.UserBytesForSectorSize(uint64(8) << i)
|
||||||
|
testFill(t, ub, []uint64{ub1, ub2, ub3, ub4})
|
||||||
|
|
||||||
|
// different 2
|
||||||
|
ub = sectorbuilder.UserBytesForSectorSize(uint64(9) << i)
|
||||||
|
testFill(t, ub, []uint64{ub1, ub4})
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
136
sectors.go
Normal file
136
sectors.go
Normal file
@ -0,0 +1,136 @@
|
|||||||
|
package sealing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"github.com/ipfs/go-datastore"
|
||||||
|
"github.com/ipfs/go-datastore/namespace"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/lib/padreader"
|
||||||
|
logging "github.com/ipfs/go-log/v2"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-address"
|
||||||
|
"github.com/filecoin-project/go-sectorbuilder"
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/api"
|
||||||
|
"github.com/filecoin-project/lotus/chain/events"
|
||||||
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
"github.com/filecoin-project/lotus/lib/statemachine"
|
||||||
|
)
|
||||||
|
|
||||||
|
const SectorStorePrefix = "/sectors"
|
||||||
|
|
||||||
|
var log = logging.Logger("sectors")
|
||||||
|
|
||||||
|
type TicketFn func(context.Context) (*sectorbuilder.SealTicket, error)
|
||||||
|
|
||||||
|
type sectorsApi interface { // TODO: trim down
|
||||||
|
// Call a read only method on actors (no interaction with the chain required)
|
||||||
|
StateCall(ctx context.Context, msg *types.Message, ts *types.TipSet) (*types.MessageReceipt, error)
|
||||||
|
StateMinerWorker(context.Context, address.Address, *types.TipSet) (address.Address, error)
|
||||||
|
StateMinerElectionPeriodStart(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error)
|
||||||
|
StateMinerSectors(context.Context, address.Address, *types.TipSet) ([]*api.ChainSectorInfo, error)
|
||||||
|
StateMinerProvingSet(context.Context, address.Address, *types.TipSet) ([]*api.ChainSectorInfo, error)
|
||||||
|
StateMinerSectorSize(context.Context, address.Address, *types.TipSet) (uint64, error)
|
||||||
|
StateWaitMsg(context.Context, cid.Cid) (*api.MsgWait, error) // TODO: removeme eventually
|
||||||
|
StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error)
|
||||||
|
StateGetReceipt(context.Context, cid.Cid, *types.TipSet) (*types.MessageReceipt, error)
|
||||||
|
|
||||||
|
MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error)
|
||||||
|
|
||||||
|
ChainHead(context.Context) (*types.TipSet, error)
|
||||||
|
ChainNotify(context.Context) (<-chan []*store.HeadChange, error)
|
||||||
|
ChainGetRandomness(context.Context, types.TipSetKey, int64) ([]byte, error)
|
||||||
|
ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error)
|
||||||
|
ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error)
|
||||||
|
|
||||||
|
WalletSign(context.Context, address.Address, []byte) (*types.Signature, error)
|
||||||
|
WalletBalance(context.Context, address.Address) (types.BigInt, error)
|
||||||
|
WalletHas(context.Context, address.Address) (bool, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type Sealing struct {
|
||||||
|
api sectorsApi
|
||||||
|
events *events.Events
|
||||||
|
|
||||||
|
maddr address.Address
|
||||||
|
worker address.Address
|
||||||
|
|
||||||
|
sb sectorbuilder.Interface
|
||||||
|
sectors *statemachine.StateGroup
|
||||||
|
tktFn TicketFn
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(api sectorsApi, events *events.Events, maddr address.Address, worker address.Address, ds datastore.Batching, sb sectorbuilder.Interface, tktFn TicketFn) *Sealing {
|
||||||
|
s := &Sealing{
|
||||||
|
api: api,
|
||||||
|
events: events,
|
||||||
|
|
||||||
|
maddr: maddr,
|
||||||
|
worker: worker,
|
||||||
|
sb: sb,
|
||||||
|
tktFn: tktFn,
|
||||||
|
}
|
||||||
|
|
||||||
|
s.sectors = statemachine.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), s, SectorInfo{})
|
||||||
|
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) Run(ctx context.Context) error {
|
||||||
|
m.events = events.NewEvents(ctx, m.api)
|
||||||
|
|
||||||
|
if err := m.restartSectors(ctx); err != nil {
|
||||||
|
log.Errorf("%+v", err)
|
||||||
|
return xerrors.Errorf("failed load sector states: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) Stop(ctx context.Context) error {
|
||||||
|
return m.sectors.Stop(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) AllocatePiece(size uint64) (sectorID uint64, offset uint64, err error) {
|
||||||
|
if padreader.PaddedSize(size) != size {
|
||||||
|
return 0, 0, xerrors.Errorf("cannot allocate unpadded piece")
|
||||||
|
}
|
||||||
|
|
||||||
|
sid, err := m.sb.AcquireSectorId() // TODO: Put more than one thing in a sector
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, xerrors.Errorf("acquiring sector ID: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// offset hard-coded to 0 since we only put one thing in a sector for now
|
||||||
|
return sid, 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) SealPiece(ctx context.Context, size uint64, r io.Reader, sectorID uint64, dealID uint64) error {
|
||||||
|
log.Infof("Seal piece for deal %d", dealID)
|
||||||
|
|
||||||
|
ppi, err := m.sb.AddPiece(size, sectorID, r, []uint64{})
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("adding piece to sector: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return m.newSector(ctx, sectorID, dealID, ppi)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) newSector(ctx context.Context, sid uint64, dealID uint64, ppi sectorbuilder.PublicPieceInfo) error {
|
||||||
|
return m.sectors.Send(sid, SectorStart{
|
||||||
|
id: sid,
|
||||||
|
pieces: []Piece{
|
||||||
|
{
|
||||||
|
DealID: dealID,
|
||||||
|
|
||||||
|
Size: ppi.Size,
|
||||||
|
CommP: ppi.CommP[:],
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
79
sectors_fsm_test.go
Normal file
79
sectors_fsm_test.go
Normal file
@ -0,0 +1,79 @@
|
|||||||
|
package sealing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/api"
|
||||||
|
"github.com/filecoin-project/lotus/lib/statemachine"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (t *test) planSingle(evt interface{}) {
|
||||||
|
_, err := t.m.plan([]statemachine.Event{{evt}}, t.state)
|
||||||
|
require.NoError(t.t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
type test struct {
|
||||||
|
m *Miner
|
||||||
|
t *testing.T
|
||||||
|
state *SectorInfo
|
||||||
|
|
||||||
|
next func(statemachine.Context, SectorInfo) error
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHappyPath(t *testing.T) {
|
||||||
|
m := test{
|
||||||
|
m: &Miner{},
|
||||||
|
t: t,
|
||||||
|
state: &SectorInfo{State: api.Packing},
|
||||||
|
}
|
||||||
|
|
||||||
|
m.planSingle(SectorPacked{})
|
||||||
|
require.Equal(m.t, m.state.State, api.Unsealed)
|
||||||
|
|
||||||
|
m.planSingle(SectorSealed{})
|
||||||
|
require.Equal(m.t, m.state.State, api.PreCommitting)
|
||||||
|
|
||||||
|
m.planSingle(SectorPreCommitted{})
|
||||||
|
require.Equal(m.t, m.state.State, api.PreCommitted)
|
||||||
|
|
||||||
|
m.planSingle(SectorSeedReady{})
|
||||||
|
require.Equal(m.t, m.state.State, api.Committing)
|
||||||
|
|
||||||
|
m.planSingle(SectorCommitted{})
|
||||||
|
require.Equal(m.t, m.state.State, api.CommitWait)
|
||||||
|
|
||||||
|
m.planSingle(SectorProving{})
|
||||||
|
require.Equal(m.t, m.state.State, api.Proving)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSeedRevert(t *testing.T) {
|
||||||
|
m := test{
|
||||||
|
m: &Miner{},
|
||||||
|
t: t,
|
||||||
|
state: &SectorInfo{State: api.Packing},
|
||||||
|
}
|
||||||
|
|
||||||
|
m.planSingle(SectorPacked{})
|
||||||
|
require.Equal(m.t, m.state.State, api.Unsealed)
|
||||||
|
|
||||||
|
m.planSingle(SectorSealed{})
|
||||||
|
require.Equal(m.t, m.state.State, api.PreCommitting)
|
||||||
|
|
||||||
|
m.planSingle(SectorPreCommitted{})
|
||||||
|
require.Equal(m.t, m.state.State, api.PreCommitted)
|
||||||
|
|
||||||
|
m.planSingle(SectorSeedReady{})
|
||||||
|
require.Equal(m.t, m.state.State, api.Committing)
|
||||||
|
|
||||||
|
_, err := m.m.plan([]statemachine.Event{{SectorSeedReady{}}, {SectorCommitted{}}}, m.state)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(m.t, m.state.State, api.Committing)
|
||||||
|
|
||||||
|
m.planSingle(SectorCommitted{})
|
||||||
|
require.Equal(m.t, m.state.State, api.CommitWait)
|
||||||
|
|
||||||
|
m.planSingle(SectorProving{})
|
||||||
|
require.Equal(m.t, m.state.State, api.Proving)
|
||||||
|
}
|
56
sectors_test.go
Normal file
56
sectors_test.go
Normal file
@ -0,0 +1,56 @@
|
|||||||
|
package sealing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"gotest.tools/assert"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-cbor-util"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSectorInfoSelialization(t *testing.T) {
|
||||||
|
si := &SectorInfo{
|
||||||
|
State: 123,
|
||||||
|
SectorID: 234,
|
||||||
|
Nonce: 345,
|
||||||
|
Pieces: []Piece{{
|
||||||
|
DealID: 1234,
|
||||||
|
Size: 5,
|
||||||
|
CommP: []byte{3},
|
||||||
|
}},
|
||||||
|
CommD: []byte{32, 4},
|
||||||
|
CommR: nil,
|
||||||
|
Proof: nil,
|
||||||
|
Ticket: SealTicket{
|
||||||
|
BlockHeight: 345,
|
||||||
|
TicketBytes: []byte{87, 78, 7, 87},
|
||||||
|
},
|
||||||
|
PreCommitMessage: nil,
|
||||||
|
Seed: SealSeed{},
|
||||||
|
CommitMessage: nil,
|
||||||
|
FaultReportMsg: nil,
|
||||||
|
LastErr: "hi",
|
||||||
|
}
|
||||||
|
|
||||||
|
b, err := cborutil.Dump(si)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var si2 SectorInfo
|
||||||
|
if err := cborutil.ReadCborRPC(bytes.NewReader(b), &si); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Equal(t, si.State, si2.State)
|
||||||
|
assert.Equal(t, si.Nonce, si2.Nonce)
|
||||||
|
assert.Equal(t, si.SectorID, si2.SectorID)
|
||||||
|
|
||||||
|
assert.Equal(t, si.Pieces, si2.Pieces)
|
||||||
|
assert.Equal(t, si.CommD, si2.CommD)
|
||||||
|
assert.Equal(t, si.Ticket, si2.Ticket)
|
||||||
|
|
||||||
|
assert.Equal(t, si, si2)
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user