From 29584e0f31c2a8a9f432b8541ab6d6b483df9dde Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 19 Jan 2024 16:07:06 +0100 Subject: [PATCH] lpseal: Wip lm-compatible subset of rpc for boost --- provider/lpmarket/fakelm/iface.go | 35 ++++ provider/lpmarket/fakelm/lmimpl.go | 264 +++++++++++++++++++++++++++++ provider/lpseal/task_sdr.go | 10 ++ provider/lpseal/task_trees.go | 7 + 4 files changed, 316 insertions(+) create mode 100644 provider/lpmarket/fakelm/iface.go create mode 100644 provider/lpmarket/fakelm/lmimpl.go diff --git a/provider/lpmarket/fakelm/iface.go b/provider/lpmarket/fakelm/iface.go new file mode 100644 index 000000000..4f11fab6d --- /dev/null +++ b/provider/lpmarket/fakelm/iface.go @@ -0,0 +1,35 @@ +package fakelm + +import ( + "context" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/storage/sealer/storiface" + "github.com/google/uuid" + "io" +) + +// MinimalLMApi is a subset of the LotusMiner API that is exposed by lotus-provider +// for consumption by boost +type MinimalLMApi interface { + ActorAddress(context.Context) (address.Address, error) + + WorkerJobs(context.Context) (map[uuid.UUID][]storiface.WorkerJob, error) + + SectorsStatus(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (api.SectorInfo, error) + + SectorsList(context.Context) ([]abi.SectorNumber, error) + SectorsSummary(ctx context.Context) (map[api.SectorState]int, error) + + SectorsListInStates(context.Context, []api.SectorState) ([]abi.SectorNumber, error) + + StorageRedeclareLocal(context.Context, *storiface.ID, bool) error + StorageList(context.Context) (map[storiface.ID][]storiface.Decl, error) + + UnsealSector(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) + IsUnsealed(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (bool, error) + + ComputeDataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storiface.Data) (abi.PieceInfo, error) + SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPieceSize, r storiface.Data, d api.PieceDealInfo) (api.SectorOffset, error) +} diff --git a/provider/lpmarket/fakelm/lmimpl.go b/provider/lpmarket/fakelm/lmimpl.go new file mode 100644 index 000000000..099d74681 --- /dev/null +++ b/provider/lpmarket/fakelm/lmimpl.go @@ -0,0 +1,264 @@ +package fakelm + +import ( + "context" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/storage/paths" + sealing "github.com/filecoin-project/lotus/storage/pipeline" + "github.com/filecoin-project/lotus/storage/sealer/storiface" + "github.com/google/uuid" + "golang.org/x/xerrors" + "io" +) + +type LMRPCProvider struct { + si paths.SectorIndex + + maddr address.Address // lotus-miner RPC is single-actor + minerID abi.ActorID + + ssize abi.SectorSize +} + +func (l *LMRPCProvider) ActorAddress(ctx context.Context) (address.Address, error) { + return l.maddr, nil +} + +func (l *LMRPCProvider) WorkerJobs(ctx context.Context) (map[uuid.UUID][]storiface.WorkerJob, error) { + // correct enough + return map[uuid.UUID][]storiface.WorkerJob{}, nil +} + +func (l *LMRPCProvider) SectorsStatus(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (api.SectorInfo, error) { + si, err := l.si.StorageFindSector(ctx, abi.SectorID{Miner: l.minerID, Number: sid}, storiface.FTSealed|storiface.FTCache, 0, false) + if err != nil { + return api.SectorInfo{}, err + } + + if len(si) == 0 { + return api.SectorInfo{ + SectorID: sid, + State: api.SectorState(sealing.UndefinedSectorState), + CommD: nil, + CommR: nil, + Proof: nil, + Deals: nil, + Pieces: nil, + Ticket: api.SealTicket{}, + Seed: api.SealSeed{}, + PreCommitMsg: nil, + CommitMsg: nil, + Retries: 0, + ToUpgrade: false, + ReplicaUpdateMessage: nil, + LastErr: "", + Log: nil, + SealProof: 0, + Activation: 0, + Expiration: 0, + DealWeight: abi.DealWeight{}, + VerifiedDealWeight: abi.DealWeight{}, + InitialPledge: abi.TokenAmount{}, + OnTime: 0, + Early: 0, + }, nil + } + + var state api.SectorState = api.SectorState(sealing.Proving) + if !si[0].CanStore { + state = api.SectorState(sealing.PreCommit1) + } + + // todo improve this with on-chain info + return api.SectorInfo{ + SectorID: sid, + State: state, + CommD: nil, + CommR: nil, + Proof: nil, + Deals: nil, + Pieces: nil, + Ticket: api.SealTicket{}, + Seed: api.SealSeed{}, + PreCommitMsg: nil, + CommitMsg: nil, + Retries: 0, + ToUpgrade: false, + ReplicaUpdateMessage: nil, + LastErr: "", + Log: nil, + + SealProof: 0, + Activation: 0, + Expiration: 0, + DealWeight: abi.DealWeight{}, + VerifiedDealWeight: abi.DealWeight{}, + InitialPledge: abi.TokenAmount{}, + OnTime: 0, + Early: 0, + }, nil +} + +func (l *LMRPCProvider) SectorsList(ctx context.Context) ([]abi.SectorNumber, error) { + decls, err := l.si.StorageList(ctx) + if err != nil { + return nil, err + } + + var out []abi.SectorNumber + for _, decl := range decls { + for _, s := range decl { + if s.Miner != l.minerID { + continue + } + + out = append(out, s.SectorID.Number) + } + } + + return out, nil +} + +type sectorParts struct { + sealed, unsealed, cache bool + inStorage bool +} + +func (l *LMRPCProvider) SectorsSummary(ctx context.Context) (map[api.SectorState]int, error) { + decls, err := l.si.StorageList(ctx) + if err != nil { + return nil, err + } + + states := map[abi.SectorID]sectorParts{} + for si, decll := range decls { + sinfo, err := l.si.StorageInfo(ctx, si) + if err != nil { + return nil, err + } + + for _, decl := range decll { + if decl.Miner != l.minerID { + continue + } + + state := states[abi.SectorID{Miner: decl.Miner, Number: decl.SectorID.Number}] + state.sealed = state.sealed || decl.Has(storiface.FTSealed) + state.unsealed = state.unsealed || decl.Has(storiface.FTUnsealed) + state.cache = state.cache || decl.Has(storiface.FTCache) + state.inStorage = state.inStorage || sinfo.CanStore + states[abi.SectorID{Miner: decl.Miner, Number: decl.SectorID.Number}] = state + } + } + + out := map[api.SectorState]int{} + for _, state := range states { + switch { + case state.sealed && state.inStorage: + out[api.SectorState(sealing.Proving)]++ + default: + // not even close to correct, but good enough for now + out[api.SectorState(sealing.PreCommit1)]++ + } + } + + return out, nil +} + +func (l *LMRPCProvider) SectorsListInStates(ctx context.Context, want []api.SectorState) ([]abi.SectorNumber, error) { + decls, err := l.StorageList(ctx) + if err != nil { + return nil, err + } + + wantProving, wantPrecommit1 := false, false + for _, s := range want { + switch s { + case api.SectorState(sealing.Proving): + wantProving = true + case api.SectorState(sealing.PreCommit1): + wantPrecommit1 = true + } + } + + states := map[abi.SectorID]sectorParts{} + + for si, decll := range decls { + sinfo, err := l.si.StorageInfo(ctx, si) + if err != nil { + return nil, err + } + + for _, decl := range decll { + if decl.Miner != l.minerID { + continue + } + + state := states[abi.SectorID{Miner: decl.Miner, Number: decl.SectorID.Number}] + state.sealed = state.sealed || decl.Has(storiface.FTSealed) + state.unsealed = state.unsealed || decl.Has(storiface.FTUnsealed) + state.cache = state.cache || decl.Has(storiface.FTCache) + state.inStorage = state.inStorage || sinfo.CanStore + states[abi.SectorID{Miner: decl.Miner, Number: decl.SectorID.Number}] = state + } + } + var out []abi.SectorNumber + + for id, state := range states { + switch { + case state.sealed && state.inStorage: + if wantProving { + out = append(out, id.Number) + } + default: + // not even close to correct, but good enough for now + if wantPrecommit1 { + out = append(out, id.Number) + } + } + } + + return out, nil +} + +func (l *LMRPCProvider) StorageRedeclareLocal(ctx context.Context, id *storiface.ID, b bool) error { + // so this rescans and redeclares sectors on lotus-miner; whyyy is boost even calling this? + + return nil +} + +func (l *LMRPCProvider) StorageList(ctx context.Context) (map[storiface.ID][]storiface.Decl, error) { + return l.si.StorageList(ctx) +} + +func (l *LMRPCProvider) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) { + return nil, xerrors.Errorf("not supported") +} + +func (l *LMRPCProvider) IsUnsealed(ctx context.Context, sectorNum abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (bool, error) { + sectorID := abi.SectorID{Miner: l.minerID, Number: sectorNum} + + si, err := l.si.StorageFindSector(ctx, sectorID, storiface.FTUnsealed, 0, false) + if err != nil { + return false, err + } + + // yes, yes, technically sectors can be partially unsealed, but that is never done in practice + // and can't even be easily done with the current implementation + return len(si) > 0, nil +} + +func (l *LMRPCProvider) ComputeDataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storiface.Data) (abi.PieceInfo, error) { + return abi.PieceInfo{}, xerrors.Errorf("not supported") +} + +func (l *LMRPCProvider) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPieceSize, r storiface.Data, d api.PieceDealInfo) (api.SectorOffset, error) { + if d.DealProposal.PieceSize != abi.PaddedPieceSize(l.ssize) { + return api.SectorOffset{}, xerrors.Errorf("only full-sector pieces are supported") + } + +} + +var _ MinimalLMApi = &LMRPCProvider{} diff --git a/provider/lpseal/task_sdr.go b/provider/lpseal/task_sdr.go index 4a59d205e..e8098ddcc 100644 --- a/provider/lpseal/task_sdr.go +++ b/provider/lpseal/task_sdr.go @@ -110,6 +110,8 @@ func (s *SDRTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bo return false, xerrors.Errorf("getting miner address: %w", err) } + // FAIL: api may be down + // FAIL-RESP: rely on harmony retry ticket, ticketEpoch, err := s.getTicket(ctx, maddr) if err != nil { return false, xerrors.Errorf("getting ticket: %w", err) @@ -117,6 +119,14 @@ func (s *SDRTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bo // do the SDR!! + // FAIL: storage may not have enough space + // FAIL-RESP: rely on harmony retry + + // LATEFAIL: compute error in sdr + // LATEFAIL-RESP: Check in Trees task should catch this; Will retry computing + // Trees; After one retry, it should return the sector to the + // SDR stage; max number of retries should be configurable + err = s.sc.GenerateSDR(ctx, sref, ticket, commd) if err != nil { return false, xerrors.Errorf("generating sdr: %w", err) diff --git a/provider/lpseal/task_trees.go b/provider/lpseal/task_trees.go index 67fe86d7f..c382f75c0 100644 --- a/provider/lpseal/task_trees.go +++ b/provider/lpseal/task_trees.go @@ -96,6 +96,9 @@ func (t *TreesTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done return false, xerrors.Errorf("computing tree d: %w", err) } + // todo: Sooo tree-d contains exactly the unsealed data in the prefix + // when we finalize we can totally just truncate that file and move it to unsealed !! + // R / C sealed, unsealed, err := t.sc.TreeRC(ctx, sref, commd) if err != nil { @@ -106,6 +109,10 @@ func (t *TreesTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done return false, xerrors.Errorf("tree-d and tree-r/c unsealed CIDs disagree") } + // todo synth porep + + // todo porep challenge check + n, err := t.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET after_tree_r = true, after_tree_c = true, after_tree_d = true, tree_r_cid = $3, tree_d_cid = $4 WHERE sp_id = $1 AND sector_number = $2`,