diff --git a/cmd/lotus-provider/tasks/tasks.go b/cmd/lotus-provider/tasks/tasks.go index dd2b9ef83..2570f68e8 100644 --- a/cmd/lotus-provider/tasks/tasks.go +++ b/cmd/lotus-provider/tasks/tasks.go @@ -83,6 +83,10 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task precommitTask := lpseal.NewSubmitPrecommitTask(sp, db, full, sender, as, cfg.Fees.MaxPreCommitGasFee) activeTasks = append(activeTasks, precommitTask) } + if cfg.Subsystems.EnablePoRepProof { + porepTask := lpseal.NewPoRepTask(db, full, sp, slr, cfg.Subsystems.PoRepProofMaxTasks) + activeTasks = append(activeTasks, porepTask) + } } log.Infow("This lotus_provider instance handles", "miner_addresses", maddrs, diff --git a/node/config/types.go b/node/config/types.go index e2924de89..430074097 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -118,6 +118,11 @@ type ProviderSubsystemsConfig struct { // EnableSendPrecommitMsg enables the sending of precommit messages to the chain // from this lotus-provider instance. EnableSendPrecommitMsg bool + + // EnableSendCommitMsg enables the computation of the porep proof + // In lotus-miner this was Commit1 / Commit2 + EnablePoRepProof bool + PoRepProofMaxTasks int } type DAGStoreConfig struct { diff --git a/provider/lpffi/sdr_funcs.go b/provider/lpffi/sdr_funcs.go index 178e39583..5353ff87e 100644 --- a/provider/lpffi/sdr_funcs.go +++ b/provider/lpffi/sdr_funcs.go @@ -167,6 +167,15 @@ func (sb *SealCalls) TreeRC(ctx context.Context, sector storiface.SectorRef, uns return ffi.SealPreCommitPhase2(p1o, paths.Cache, paths.Sealed) } +func (sb *SealCalls) PoRepSnark(ctx context.Context, sn storiface.SectorRef, sealed, unsealed cid.Cid, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness) ([]byte, error) { + vproof, err := sb.sectors.storage.GenetartePoRepVanillaProof(ctx, sn, sealed, unsealed, ticket, seed) + if err != nil { + return nil, xerrors.Errorf("failed to generate vanilla proof: %w", err) + } + + return ffi.SealCommitPhase2(vproof, sn.ID.Number, sn.ID.Miner) +} + func (sb *SealCalls) makePhase1Out(unsCid cid.Cid, spt abi.RegisteredSealProof) ([]byte, error) { commd, err := commcid.CIDToDataCommitmentV1(unsCid) if err != nil { diff --git a/provider/lpseal/poller.go b/provider/lpseal/poller.go index 343a1e7ae..f097b27f5 100644 --- a/provider/lpseal/poller.go +++ b/provider/lpseal/poller.go @@ -21,6 +21,7 @@ const ( pollerSDR = iota pollerTrees pollerPrecommitMsg + pollerPoRep numPollers ) @@ -207,7 +208,7 @@ func (s *SealPoller) poll(ctx context.Context) error { } todoWaitSeed := false - if task.TaskPoRep == nil && todoWaitSeed { + if task.TaskPoRep != nil && todoWaitSeed { // todo start porep task } } diff --git a/provider/lpseal/task_porep.go b/provider/lpseal/task_porep.go new file mode 100644 index 000000000..8a428d20c --- /dev/null +++ b/provider/lpseal/task_porep.go @@ -0,0 +1,161 @@ +package lpseal + +import ( + "bytes" + "context" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/crypto" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/filecoin-project/lotus/lib/harmony/harmonytask" + "github.com/filecoin-project/lotus/lib/harmony/resources" + "github.com/filecoin-project/lotus/provider/lpffi" + "github.com/filecoin-project/lotus/storage/sealer/storiface" + "github.com/ipfs/go-cid" + "golang.org/x/xerrors" +) + +type PoRepAPI interface { + ChainHead(context.Context) (*types.TipSet, error) + StateGetRandomnessFromBeacon(context.Context, crypto.DomainSeparationTag, abi.ChainEpoch, []byte, types.TipSetKey) (abi.Randomness, error) +} + +type PoRepTask struct { + db *harmonydb.DB + api PoRepAPI + sp *SealPoller + sc *lpffi.SealCalls + + max int +} + +func NewPoRepTask(db *harmonydb.DB, api PoRepAPI, sp *SealPoller, sc *lpffi.SealCalls, maxPoRep int) *PoRepTask { + return &PoRepTask{ + db: db, + api: api, + sp: sp, + sc: sc, + max: maxPoRep, + } +} + +func (p *PoRepTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { + ctx := context.Background() + + var sectorParamsArr []struct { + SpID int64 `db:"sp_id"` + SectorNumber int64 `db:"sector_number"` + RegSealProof abi.RegisteredSealProof `db:"reg_seal_proof"` + TicketEpoch abi.ChainEpoch `db:"ticket_epoch"` + TicketValue []byte `db:"ticket_value"` + SeedEpoch abi.ChainEpoch `db:"seed_epoch"` + SealedCID string `db:"tree_r_cid"` + UnsealedCID string `db:"tree_d_cid"` + } + + err = p.db.Select(ctx, §orParamsArr, ` + SELECT sp_id, sector_number, reg_seal_proof, ticket_epoch, ticket_value, seed_epoch, tree_r_cid, tree_d_cid + FROM sectors_sdr_pipeline + WHERE task_id_porep = $1`, taskID) + if err != nil { + return false, err + } + if len(sectorParamsArr) != 1 { + return false, xerrors.Errorf("expected 1 sector params, got %d", len(sectorParamsArr)) + } + sectorParams := sectorParamsArr[0] + + sealed, err := cid.Parse(sectorParams.SealedCID) + if err != nil { + return false, xerrors.Errorf("failed to parse sealed cid: %w", err) + } + + unsealed, err := cid.Parse(sectorParams.UnsealedCID) + if err != nil { + return false, xerrors.Errorf("failed to parse unsealed cid: %w", err) + } + + ts, err := p.api.ChainHead(ctx) + if err != nil { + return false, xerrors.Errorf("failed to get chain head: %w", err) + } + + maddr, err := address.NewIDAddress(uint64(sectorParams.SpID)) + if err != nil { + return false, xerrors.Errorf("failed to create miner address: %w", err) + } + + buf := new(bytes.Buffer) + if err := maddr.MarshalCBOR(buf); err != nil { + return false, xerrors.Errorf("failed to marshal miner address: %w", err) + } + + rand, err := p.api.StateGetRandomnessFromBeacon(ctx, crypto.DomainSeparationTag_InteractiveSealChallengeSeed, sectorParams.SeedEpoch, buf.Bytes(), ts.Key()) + if err != nil { + return false, xerrors.Errorf("failed to get randomness for computing seal proof: %w", err) + } + + sr := storiface.SectorRef{ + ID: abi.SectorID{ + Miner: abi.ActorID(sectorParams.SpID), + Number: abi.SectorNumber(sectorParams.SectorNumber), + }, + ProofType: sectorParams.RegSealProof, + } + + // COMPUTE THE PROOF! + + proof, err := p.sc.PoRepSnark(ctx, sr, sealed, unsealed, sectorParams.TicketValue, abi.InteractiveSealRandomness(rand)) + if err != nil { + return false, xerrors.Errorf("failed to compute seal proof: %w", err) + } + + // store success! + n, err := p.db.Exec(ctx, `UPDATE sectors_sdr_pipeline + SET after_sdr = true, seed_value = $3, porep_proof = $4 + WHERE sp_id = $1 AND sector_number = $2`, + sectorParams.SpID, sectorParams.SectorNumber, []byte(rand), proof) + if err != nil { + return false, xerrors.Errorf("store sdr success: updating pipeline: %w", err) + } + if n != 1 { + return false, xerrors.Errorf("store sdr success: updated %d rows", n) + } + + return true, nil +} + +func (p *PoRepTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { + // todo sort by priority + + id := ids[0] + return &id, nil +} + +func (p *PoRepTask) TypeDetails() harmonytask.TaskTypeDetails { + res := harmonytask.TaskTypeDetails{ + Max: p.max, + Name: "PoRep", + Cost: resources.Resources{ + Cpu: 1, + Gpu: 1, + Ram: 50 << 30, // todo correct value + MachineID: 0, + }, + MaxFailures: 5, + Follows: nil, + } + + if isDevnet { + res.Cost.Ram = 1 << 30 + } + + return res +} + +func (p *PoRepTask) Adder(taskFunc harmonytask.AddTaskFunc) { + p.sp.pollers[pollerPoRep].Set(taskFunc) +} + +var _ harmonytask.TaskInterface = &PoRepTask{} diff --git a/provider/lpseal/task_sdr.go b/provider/lpseal/task_sdr.go index f5a92a7e1..0c6c3e5e5 100644 --- a/provider/lpseal/task_sdr.go +++ b/provider/lpseal/task_sdr.go @@ -168,7 +168,7 @@ func (s *SDRTask) TypeDetails() harmonytask.TaskTypeDetails { Cost: resources.Resources{ // todo offset for prefetch? Cpu: 4, // todo multicore sdr Gpu: 0, - Ram: 54 << 30, // todo measure; lower on 2k devnetn + Ram: 54 << 30, }, MaxFailures: 2, Follows: nil,