lpseal: Wip lm-compatible subset of rpc for boost

This commit is contained in:
Łukasz Magiera 2024-01-19 16:07:06 +01:00
parent 8213d7bca9
commit 29584e0f31
4 changed files with 316 additions and 0 deletions

View File

@ -0,0 +1,35 @@
package fakelm
import (
"context"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
"github.com/google/uuid"
"io"
)
// MinimalLMApi is a subset of the LotusMiner API that is exposed by lotus-provider
// for consumption by boost
type MinimalLMApi interface {
ActorAddress(context.Context) (address.Address, error)
WorkerJobs(context.Context) (map[uuid.UUID][]storiface.WorkerJob, error)
SectorsStatus(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (api.SectorInfo, error)
SectorsList(context.Context) ([]abi.SectorNumber, error)
SectorsSummary(ctx context.Context) (map[api.SectorState]int, error)
SectorsListInStates(context.Context, []api.SectorState) ([]abi.SectorNumber, error)
StorageRedeclareLocal(context.Context, *storiface.ID, bool) error
StorageList(context.Context) (map[storiface.ID][]storiface.Decl, error)
UnsealSector(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error)
IsUnsealed(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (bool, error)
ComputeDataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storiface.Data) (abi.PieceInfo, error)
SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPieceSize, r storiface.Data, d api.PieceDealInfo) (api.SectorOffset, error)
}

View File

@ -0,0 +1,264 @@
package fakelm
import (
"context"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/storage/paths"
sealing "github.com/filecoin-project/lotus/storage/pipeline"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
"github.com/google/uuid"
"golang.org/x/xerrors"
"io"
)
type LMRPCProvider struct {
si paths.SectorIndex
maddr address.Address // lotus-miner RPC is single-actor
minerID abi.ActorID
ssize abi.SectorSize
}
func (l *LMRPCProvider) ActorAddress(ctx context.Context) (address.Address, error) {
return l.maddr, nil
}
func (l *LMRPCProvider) WorkerJobs(ctx context.Context) (map[uuid.UUID][]storiface.WorkerJob, error) {
// correct enough
return map[uuid.UUID][]storiface.WorkerJob{}, nil
}
func (l *LMRPCProvider) SectorsStatus(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (api.SectorInfo, error) {
si, err := l.si.StorageFindSector(ctx, abi.SectorID{Miner: l.minerID, Number: sid}, storiface.FTSealed|storiface.FTCache, 0, false)
if err != nil {
return api.SectorInfo{}, err
}
if len(si) == 0 {
return api.SectorInfo{
SectorID: sid,
State: api.SectorState(sealing.UndefinedSectorState),
CommD: nil,
CommR: nil,
Proof: nil,
Deals: nil,
Pieces: nil,
Ticket: api.SealTicket{},
Seed: api.SealSeed{},
PreCommitMsg: nil,
CommitMsg: nil,
Retries: 0,
ToUpgrade: false,
ReplicaUpdateMessage: nil,
LastErr: "",
Log: nil,
SealProof: 0,
Activation: 0,
Expiration: 0,
DealWeight: abi.DealWeight{},
VerifiedDealWeight: abi.DealWeight{},
InitialPledge: abi.TokenAmount{},
OnTime: 0,
Early: 0,
}, nil
}
var state api.SectorState = api.SectorState(sealing.Proving)
if !si[0].CanStore {
state = api.SectorState(sealing.PreCommit1)
}
// todo improve this with on-chain info
return api.SectorInfo{
SectorID: sid,
State: state,
CommD: nil,
CommR: nil,
Proof: nil,
Deals: nil,
Pieces: nil,
Ticket: api.SealTicket{},
Seed: api.SealSeed{},
PreCommitMsg: nil,
CommitMsg: nil,
Retries: 0,
ToUpgrade: false,
ReplicaUpdateMessage: nil,
LastErr: "",
Log: nil,
SealProof: 0,
Activation: 0,
Expiration: 0,
DealWeight: abi.DealWeight{},
VerifiedDealWeight: abi.DealWeight{},
InitialPledge: abi.TokenAmount{},
OnTime: 0,
Early: 0,
}, nil
}
func (l *LMRPCProvider) SectorsList(ctx context.Context) ([]abi.SectorNumber, error) {
decls, err := l.si.StorageList(ctx)
if err != nil {
return nil, err
}
var out []abi.SectorNumber
for _, decl := range decls {
for _, s := range decl {
if s.Miner != l.minerID {
continue
}
out = append(out, s.SectorID.Number)
}
}
return out, nil
}
type sectorParts struct {
sealed, unsealed, cache bool
inStorage bool
}
func (l *LMRPCProvider) SectorsSummary(ctx context.Context) (map[api.SectorState]int, error) {
decls, err := l.si.StorageList(ctx)
if err != nil {
return nil, err
}
states := map[abi.SectorID]sectorParts{}
for si, decll := range decls {
sinfo, err := l.si.StorageInfo(ctx, si)
if err != nil {
return nil, err
}
for _, decl := range decll {
if decl.Miner != l.minerID {
continue
}
state := states[abi.SectorID{Miner: decl.Miner, Number: decl.SectorID.Number}]
state.sealed = state.sealed || decl.Has(storiface.FTSealed)
state.unsealed = state.unsealed || decl.Has(storiface.FTUnsealed)
state.cache = state.cache || decl.Has(storiface.FTCache)
state.inStorage = state.inStorage || sinfo.CanStore
states[abi.SectorID{Miner: decl.Miner, Number: decl.SectorID.Number}] = state
}
}
out := map[api.SectorState]int{}
for _, state := range states {
switch {
case state.sealed && state.inStorage:
out[api.SectorState(sealing.Proving)]++
default:
// not even close to correct, but good enough for now
out[api.SectorState(sealing.PreCommit1)]++
}
}
return out, nil
}
func (l *LMRPCProvider) SectorsListInStates(ctx context.Context, want []api.SectorState) ([]abi.SectorNumber, error) {
decls, err := l.StorageList(ctx)
if err != nil {
return nil, err
}
wantProving, wantPrecommit1 := false, false
for _, s := range want {
switch s {
case api.SectorState(sealing.Proving):
wantProving = true
case api.SectorState(sealing.PreCommit1):
wantPrecommit1 = true
}
}
states := map[abi.SectorID]sectorParts{}
for si, decll := range decls {
sinfo, err := l.si.StorageInfo(ctx, si)
if err != nil {
return nil, err
}
for _, decl := range decll {
if decl.Miner != l.minerID {
continue
}
state := states[abi.SectorID{Miner: decl.Miner, Number: decl.SectorID.Number}]
state.sealed = state.sealed || decl.Has(storiface.FTSealed)
state.unsealed = state.unsealed || decl.Has(storiface.FTUnsealed)
state.cache = state.cache || decl.Has(storiface.FTCache)
state.inStorage = state.inStorage || sinfo.CanStore
states[abi.SectorID{Miner: decl.Miner, Number: decl.SectorID.Number}] = state
}
}
var out []abi.SectorNumber
for id, state := range states {
switch {
case state.sealed && state.inStorage:
if wantProving {
out = append(out, id.Number)
}
default:
// not even close to correct, but good enough for now
if wantPrecommit1 {
out = append(out, id.Number)
}
}
}
return out, nil
}
func (l *LMRPCProvider) StorageRedeclareLocal(ctx context.Context, id *storiface.ID, b bool) error {
// so this rescans and redeclares sectors on lotus-miner; whyyy is boost even calling this?
return nil
}
func (l *LMRPCProvider) StorageList(ctx context.Context) (map[storiface.ID][]storiface.Decl, error) {
return l.si.StorageList(ctx)
}
func (l *LMRPCProvider) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
return nil, xerrors.Errorf("not supported")
}
func (l *LMRPCProvider) IsUnsealed(ctx context.Context, sectorNum abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (bool, error) {
sectorID := abi.SectorID{Miner: l.minerID, Number: sectorNum}
si, err := l.si.StorageFindSector(ctx, sectorID, storiface.FTUnsealed, 0, false)
if err != nil {
return false, err
}
// yes, yes, technically sectors can be partially unsealed, but that is never done in practice
// and can't even be easily done with the current implementation
return len(si) > 0, nil
}
func (l *LMRPCProvider) ComputeDataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storiface.Data) (abi.PieceInfo, error) {
return abi.PieceInfo{}, xerrors.Errorf("not supported")
}
func (l *LMRPCProvider) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPieceSize, r storiface.Data, d api.PieceDealInfo) (api.SectorOffset, error) {
if d.DealProposal.PieceSize != abi.PaddedPieceSize(l.ssize) {
return api.SectorOffset{}, xerrors.Errorf("only full-sector pieces are supported")
}
}
var _ MinimalLMApi = &LMRPCProvider{}

View File

@ -110,6 +110,8 @@ func (s *SDRTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bo
return false, xerrors.Errorf("getting miner address: %w", err)
}
// FAIL: api may be down
// FAIL-RESP: rely on harmony retry
ticket, ticketEpoch, err := s.getTicket(ctx, maddr)
if err != nil {
return false, xerrors.Errorf("getting ticket: %w", err)
@ -117,6 +119,14 @@ func (s *SDRTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bo
// do the SDR!!
// FAIL: storage may not have enough space
// FAIL-RESP: rely on harmony retry
// LATEFAIL: compute error in sdr
// LATEFAIL-RESP: Check in Trees task should catch this; Will retry computing
// Trees; After one retry, it should return the sector to the
// SDR stage; max number of retries should be configurable
err = s.sc.GenerateSDR(ctx, sref, ticket, commd)
if err != nil {
return false, xerrors.Errorf("generating sdr: %w", err)

View File

@ -96,6 +96,9 @@ func (t *TreesTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
return false, xerrors.Errorf("computing tree d: %w", err)
}
// todo: Sooo tree-d contains exactly the unsealed data in the prefix
// when we finalize we can totally just truncate that file and move it to unsealed !!
// R / C
sealed, unsealed, err := t.sc.TreeRC(ctx, sref, commd)
if err != nil {
@ -106,6 +109,10 @@ func (t *TreesTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
return false, xerrors.Errorf("tree-d and tree-r/c unsealed CIDs disagree")
}
// todo synth porep
// todo porep challenge check
n, err := t.db.Exec(ctx, `UPDATE sectors_sdr_pipeline
SET after_tree_r = true, after_tree_c = true, after_tree_d = true, tree_r_cid = $3, tree_d_cid = $4
WHERE sp_id = $1 AND sector_number = $2`,