From 2b405f433cd59598f8f064d69e7ecfdcf1ea5ac4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 27 Oct 2023 16:10:55 +0200 Subject: [PATCH 1/2] lpwindow: Mostly done proper Do --- go.mod | 2 +- lib/harmony/harmonydb/sql/20230823.sql | 13 +- provider/builder.go | 14 +- provider/lpwindow/do.go | 431 +++++++++++++++++++++++++ provider/lpwindow/faults_simple.go | 149 +++++++++ provider/lpwindow/task.go | 55 +++- storage/sealer/worker_local.go | 11 +- 7 files changed, 655 insertions(+), 20 deletions(-) create mode 100644 provider/lpwindow/do.go create mode 100644 provider/lpwindow/faults_simple.go diff --git a/go.mod b/go.mod index 4c654c16e..1ec936089 100644 --- a/go.mod +++ b/go.mod @@ -134,6 +134,7 @@ require ( github.com/multiformats/go-varint v0.0.7 github.com/open-rpc/meta-schema v0.0.0-20201029221707-1b72ef2ea333 github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 + github.com/pkg/errors v0.9.1 github.com/polydawn/refmt v0.89.0 github.com/prometheus/client_golang v1.16.0 github.com/puzpuzpuz/xsync/v2 v2.4.0 @@ -310,7 +311,6 @@ require ( github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pelletier/go-toml/v2 v2.0.8 // indirect github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9 // indirect - github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/common v0.42.0 // indirect diff --git a/lib/harmony/harmonydb/sql/20230823.sql b/lib/harmony/harmonydb/sql/20230823.sql index 3b375c332..535599767 100644 --- a/lib/harmony/harmonydb/sql/20230823.sql +++ b/lib/harmony/harmonydb/sql/20230823.sql @@ -19,13 +19,14 @@ comment on column wdpost_partition_tasks.partition_index is 'partition index wit create table wdpost_proofs ( - deadline bigint not null, - partitions bytea not null, - proof_type bigint, - proof_bytes bytea, - chain_commit_epoch bigint, - chain_commit_rand bytea + sp_id bigint not null, + deadline bigint not null, + partition bigint not null, + submit_at_epoch bigint not null, + submit_by_epoch bigint not null, + proof_message bytea ); + diff --git a/provider/builder.go b/provider/builder.go index 325c175e8..bc700f52e 100644 --- a/provider/builder.go +++ b/provider/builder.go @@ -2,28 +2,32 @@ package provider import ( "context" + "github.com/filecoin-project/lotus/storage/paths" + "github.com/filecoin-project/lotus/storage/sealer" + "time" logging "github.com/ipfs/go-log/v2" "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/node/config" dtypes "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/provider/chainsched" "github.com/filecoin-project/lotus/provider/lpwindow" "github.com/filecoin-project/lotus/storage/ctladdr" - "github.com/filecoin-project/lotus/storage/sealer" "github.com/filecoin-project/lotus/storage/sealer/storiface" ) var log = logging.Logger("provider") func WindowPostScheduler(ctx context.Context, fc config.LotusProviderFees, pc config.ProvingConfig, - api api.FullNode, sealer sealer.SectorManager, verif storiface.Verifier, j journal.Journal, - as *ctladdr.AddressSelector, maddr []dtypes.MinerAddress, db *harmonydb.DB, max int) (*lpwindow.WdPostTask, error) { + api api.FullNode, verif storiface.Verifier, lw *sealer.LocalWorker, + as *ctladdr.AddressSelector, maddr []dtypes.MinerAddress, db *harmonydb.DB, stor paths.Store, idx paths.SectorIndex) (*lpwindow.WdPostTask, error) { chainSched := chainsched.New(api) - return lpwindow.NewWdPostTask(db, nil, chainSched, maddr) + // todo config + ft := lpwindow.NewSimpleFaultTracker(stor, idx, 32, 5*time.Second, 300*time.Second) + + return lpwindow.NewWdPostTask(db, api, ft, lw, verif, chainSched, maddr) } diff --git a/provider/lpwindow/do.go b/provider/lpwindow/do.go new file mode 100644 index 000000000..ffcccc628 --- /dev/null +++ b/provider/lpwindow/do.go @@ -0,0 +1,431 @@ +package lpwindow + +import ( + "bytes" + "context" + ffi "github.com/filecoin-project/filecoin-ffi" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-bitfield" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/builtin" + miner2 "github.com/filecoin-project/go-state-types/builtin/v9/miner" + "github.com/filecoin-project/go-state-types/crypto" + "github.com/filecoin-project/go-state-types/dline" + "github.com/filecoin-project/go-state-types/proof" + "github.com/filecoin-project/lotus/build" + types "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/storage/sealer/storiface" + proof7 "github.com/filecoin-project/specs-actors/v7/actors/runtime/proof" + "github.com/ipfs/go-cid" + "go.uber.org/multierr" + "golang.org/x/xerrors" + "sort" + "sync" + "time" +) + +const disablePreChecks = false // todo config + +func (t *WdPostTask) doPartition(ctx context.Context, ts *types.TipSet, maddr address.Address, di *dline.Info, partIdx uint64) (out *miner2.SubmitWindowedPoStParams, err error) { + defer func() { + if r := recover(); r != nil { + log.Errorf("recover: %s", r) + err = xerrors.Errorf("panic in doPartition: %s", r) + } + }() + + buf := new(bytes.Buffer) + if err := maddr.MarshalCBOR(buf); err != nil { + return nil, xerrors.Errorf("failed to marshal address to cbor: %w", err) + } + + headTs, err := t.api.ChainHead(ctx) + if err != nil { + return nil, xerrors.Errorf("getting current head: %w", err) + } + + rand, err := t.api.StateGetRandomnessFromBeacon(ctx, crypto.DomainSeparationTag_WindowedPoStChallengeSeed, di.Challenge, buf.Bytes(), headTs.Key()) + if err != nil { + return nil, xerrors.Errorf("failed to get chain randomness from beacon for window post (ts=%d; deadline=%d): %w", ts.Height(), di, err) + } + + parts, err := t.api.StateMinerPartitions(ctx, maddr, di.Index, ts.Key()) + if err != nil { + return nil, xerrors.Errorf("getting partitions: %w", err) + } + + if partIdx >= uint64(len(parts)) { + return nil, xerrors.Errorf("invalid partIdx %d (deadline has %d partitions)", partIdx, len(parts)) + } + + partition := parts[partIdx] + + params := miner2.SubmitWindowedPoStParams{ + Deadline: di.Index, + Partitions: make([]miner2.PoStPartition, 0, 1), + Proofs: nil, + } + + var partitions []miner2.PoStPartition + var xsinfos []proof7.ExtendedSectorInfo + + { + toProve, err := bitfield.SubtractBitField(partition.LiveSectors, partition.FaultySectors) + if err != nil { + return nil, xerrors.Errorf("removing faults from set of sectors to prove: %w", err) + } + /*if manual { + // this is a check run, we want to prove faulty sectors, even + // if they are not declared as recovering. + toProve = partition.LiveSectors + }*/ + toProve, err = bitfield.MergeBitFields(toProve, partition.RecoveringSectors) + if err != nil { + return nil, xerrors.Errorf("adding recoveries to set of sectors to prove: %w", err) + } + + good, err := toProve.Copy() + if err != nil { + return nil, xerrors.Errorf("copy toProve: %w", err) + } + if !disablePreChecks { + good, err = t.checkSectors(ctx, maddr, toProve, ts.Key()) + if err != nil { + return nil, xerrors.Errorf("checking sectors to skip: %w", err) + } + } + + /*good, err = bitfield.SubtractBitField(good, postSkipped) + if err != nil { + return nil, xerrors.Errorf("toProve - postSkipped: %w", err) + } + + post skipped is legacy retry mechanism, shouldn't be needed anymore + */ + + skipped, err := bitfield.SubtractBitField(toProve, good) + if err != nil { + return nil, xerrors.Errorf("toProve - good: %w", err) + } + + sc, err := skipped.Count() + if err != nil { + return nil, xerrors.Errorf("getting skipped sector count: %w", err) + } + + skipCount := sc + + ssi, err := t.sectorsForProof(ctx, maddr, good, partition.AllSectors, ts) + if err != nil { + return nil, xerrors.Errorf("getting sorted sector info: %w", err) + } + + if len(ssi) == 0 { + return nil, xerrors.Errorf("no sectors to prove") + } + + xsinfos = append(xsinfos, ssi...) + partitions = append(partitions, miner2.PoStPartition{ + Index: partIdx, + Skipped: skipped, + }) + + log.Infow("running window post", + "chain-random", rand, + "deadline", di, + "height", ts.Height(), + "skipped", skipCount) + + tsStart := build.Clock.Now() + + mid, err := address.IDFromAddress(maddr) + if err != nil { + return nil, err + } + + nv, err := t.api.StateNetworkVersion(ctx, ts.Key()) + if err != nil { + return nil, xerrors.Errorf("getting network version: %w", err) + } + + ppt, err := xsinfos[0].SealProof.RegisteredWindowPoStProofByNetworkVersion(nv) + if err != nil { + return nil, xerrors.Errorf("failed to get window post type: %w", err) + } + + postOut, ps, err := t.generateWindowPoSt(ctx, ppt, abi.ActorID(mid), xsinfos, append(abi.PoStRandomness{}, rand...)) + elapsed := time.Since(tsStart) + log.Infow("computing window post", "partition", partIdx, "elapsed", elapsed, "skip", len(ps), "err", err) + if err != nil { + log.Errorf("error generating window post: %s", err) + } + + if err == nil { + // If we proved nothing, something is very wrong. + if len(postOut) == 0 { + log.Errorf("len(postOut) == 0") + return nil, xerrors.Errorf("received no proofs back from generate window post") + } + + headTs, err := t.api.ChainHead(ctx) + if err != nil { + return nil, xerrors.Errorf("getting current head: %w", err) + } + + checkRand, err := t.api.StateGetRandomnessFromBeacon(ctx, crypto.DomainSeparationTag_WindowedPoStChallengeSeed, di.Challenge, buf.Bytes(), headTs.Key()) + if err != nil { + return nil, xerrors.Errorf("failed to get chain randomness from beacon for window post (ts=%d; deadline=%d): %w", ts.Height(), di, err) + } + + if !bytes.Equal(checkRand, rand) { + // this is a check from legacy code, there it would retry with new randomness. + // here we don't retry because the current network version uses beacon randomness + // which should never change. We do keep this check tho to detect potential issues. + return nil, xerrors.Errorf("post generation randomness was different from random beacon") + } + + sinfos := make([]proof7.SectorInfo, len(xsinfos)) + for i, xsi := range xsinfos { + sinfos[i] = proof7.SectorInfo{ + SealProof: xsi.SealProof, + SectorNumber: xsi.SectorNumber, + SealedCID: xsi.SealedCID, + } + } + if correct, err := t.verifier.VerifyWindowPoSt(ctx, proof.WindowPoStVerifyInfo{ + Randomness: abi.PoStRandomness(checkRand), + Proofs: postOut, + ChallengedSectors: sinfos, + Prover: abi.ActorID(mid), + }); err != nil { + /*log.Errorw("window post verification failed", "post", postOut, "error", err) + time.Sleep(5 * time.Second) + continue todo retry loop */ + } else if !correct { + /*log.Errorw("generated incorrect window post proof", "post", postOut, "error", err) + continue todo retry loop */ + } + + // Proof generation successful, stop retrying + //somethingToProve = true + params.Partitions = partitions + params.Proofs = postOut + //break + + return ¶ms, nil + } + } + + return nil, xerrors.Errorf("failed to generate window post") +} + +func (t *WdPostTask) checkSectors(ctx context.Context, maddr address.Address, check bitfield.BitField, tsk types.TipSetKey) (bitfield.BitField, error) { + mid, err := address.IDFromAddress(maddr) + if err != nil { + return bitfield.BitField{}, xerrors.Errorf("failed to convert to ID addr: %w", err) + } + + sectorInfos, err := t.api.StateMinerSectors(ctx, maddr, &check, tsk) + if err != nil { + return bitfield.BitField{}, xerrors.Errorf("failed to get sector infos: %w", err) + } + + type checkSector struct { + sealed cid.Cid + update bool + } + + sectors := make(map[abi.SectorNumber]checkSector) + var tocheck []storiface.SectorRef + for _, info := range sectorInfos { + sectors[info.SectorNumber] = checkSector{ + sealed: info.SealedCID, + update: info.SectorKeyCID != nil, + } + tocheck = append(tocheck, storiface.SectorRef{ + ProofType: info.SealProof, + ID: abi.SectorID{ + Miner: abi.ActorID(mid), + Number: info.SectorNumber, + }, + }) + } + + if len(tocheck) == 0 { + return bitfield.BitField{}, nil + } + + pp, err := tocheck[0].ProofType.RegisteredWindowPoStProof() + if err != nil { + return bitfield.BitField{}, xerrors.Errorf("failed to get window PoSt proof: %w", err) + } + pp, err = pp.ToV1_1PostProof() + if err != nil { + return bitfield.BitField{}, xerrors.Errorf("failed to convert to v1_1 post proof: %w", err) + } + + bad, err := t.faultTracker.CheckProvable(ctx, pp, tocheck, func(ctx context.Context, id abi.SectorID) (cid.Cid, bool, error) { + s, ok := sectors[id.Number] + if !ok { + return cid.Undef, false, xerrors.Errorf("sealed CID not found") + } + return s.sealed, s.update, nil + }) + if err != nil { + return bitfield.BitField{}, xerrors.Errorf("checking provable sectors: %w", err) + } + for id := range bad { + delete(sectors, id.Number) + } + + log.Warnw("Checked sectors", "checked", len(tocheck), "good", len(sectors)) + + sbf := bitfield.New() + for s := range sectors { + sbf.Set(uint64(s)) + } + + return sbf, nil +} + +func (t *WdPostTask) sectorsForProof(ctx context.Context, maddr address.Address, goodSectors, allSectors bitfield.BitField, ts *types.TipSet) ([]proof7.ExtendedSectorInfo, error) { + sset, err := t.api.StateMinerSectors(ctx, maddr, &goodSectors, ts.Key()) + if err != nil { + return nil, err + } + + if len(sset) == 0 { + return nil, nil + } + + sectorByID := make(map[uint64]proof7.ExtendedSectorInfo, len(sset)) + for _, sector := range sset { + sectorByID[uint64(sector.SectorNumber)] = proof7.ExtendedSectorInfo{ + SectorNumber: sector.SectorNumber, + SealedCID: sector.SealedCID, + SealProof: sector.SealProof, + SectorKey: sector.SectorKeyCID, + } + } + + proofSectors := make([]proof7.ExtendedSectorInfo, 0, len(sset)) + if err := allSectors.ForEach(func(sectorNo uint64) error { + if info, found := sectorByID[sectorNo]; found { + proofSectors = append(proofSectors, info) + } else { + //skip + // todo: testing: old logic used to put 'substitute' sectors here + // that probably isn't needed post nv19, but we do need to check that + } + return nil + }); err != nil { + return nil, xerrors.Errorf("iterating partition sector bitmap: %w", err) + } + + return proofSectors, nil +} + +func (t *WdPostTask) generateWindowPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, minerID abi.ActorID, sectorInfo []proof.ExtendedSectorInfo, randomness abi.PoStRandomness) ([]proof.PoStProof, []abi.SectorID, error) { + var retErr error = nil + randomness[31] &= 0x3f + + out := make([]proof.PoStProof, 0) + + if len(sectorInfo) == 0 { + return nil, nil, xerrors.New("generate window post len(sectorInfo)=0") + } + + maxPartitionSize, err := builtin.PoStProofWindowPoStPartitionSectors(ppt) // todo proxy through chain/actors + if err != nil { + return nil, nil, xerrors.Errorf("get sectors count of partition failed:%+v", err) + } + + // The partitions number of this batch + // ceil(sectorInfos / maxPartitionSize) + partitionCount := uint64((len(sectorInfo) + int(maxPartitionSize) - 1) / int(maxPartitionSize)) + if partitionCount > 1 { + return nil, nil, xerrors.Errorf("generateWindowPoSt partitionCount:%d, only support 1", partitionCount) + } + + log.Infof("generateWindowPoSt maxPartitionSize:%d partitionCount:%d", maxPartitionSize, partitionCount) + + var skipped []abi.SectorID + var flk sync.Mutex + cctx, cancel := context.WithCancel(ctx) + defer cancel() + + sort.Slice(sectorInfo, func(i, j int) bool { + return sectorInfo[i].SectorNumber < sectorInfo[j].SectorNumber + }) + + sectorNums := make([]abi.SectorNumber, len(sectorInfo)) + sectorMap := make(map[abi.SectorNumber]proof.ExtendedSectorInfo) + for i, s := range sectorInfo { + sectorNums[i] = s.SectorNumber + sectorMap[s.SectorNumber] = s + } + + postChallenges, err := ffi.GeneratePoStFallbackSectorChallenges(ppt, minerID, randomness, sectorNums) + if err != nil { + return nil, nil, xerrors.Errorf("generating fallback challenges: %v", err) + } + + proofList := make([]ffi.PartitionProof, partitionCount) + var wg sync.WaitGroup + wg.Add(int(partitionCount)) + + for partIdx := uint64(0); partIdx < partitionCount; partIdx++ { + go func(partIdx uint64) { + defer wg.Done() + + sectors := make([]storiface.PostSectorChallenge, 0) + for i := uint64(0); i < maxPartitionSize; i++ { + si := i + partIdx*maxPartitionSize + if si >= uint64(len(postChallenges.Sectors)) { + break + } + + snum := postChallenges.Sectors[si] + sinfo := sectorMap[snum] + + sectors = append(sectors, storiface.PostSectorChallenge{ + SealProof: sinfo.SealProof, + SectorNumber: snum, + SealedCID: sinfo.SealedCID, + Challenge: postChallenges.Challenges[snum], + Update: sinfo.SectorKey != nil, + }) + } + + pr, err := t.prover.GenerateWindowPoStAdv(cctx, ppt, minerID, sectors, int(partIdx), randomness, true) + sk := pr.Skipped + + if err != nil || len(sk) > 0 { + log.Errorf("generateWindowPost part:%d, skipped:%d, sectors: %d, err: %+v", partIdx, len(sk), len(sectors), err) + flk.Lock() + skipped = append(skipped, sk...) + + if err != nil { + retErr = multierr.Append(retErr, xerrors.Errorf("partitionIndex:%d err:%+v", partIdx, err)) + } + flk.Unlock() + } + + proofList[partIdx] = ffi.PartitionProof(pr.PoStProofs) + }(partIdx) + } + + wg.Wait() + + /* if len(skipped) > 0 { + return nil, skipped, multierr.Append(xerrors.Errorf("some sectors (%d) were skipped", len(skipped)), retErr) + }*/ + + postProofs, err := ffi.MergeWindowPoStPartitionProofs(ppt, proofList) + if err != nil { + return nil, skipped, xerrors.Errorf("merge windowPoSt partition proofs: %v", err) + } + + out = append(out, *postProofs) + return out, skipped, retErr +} diff --git a/provider/lpwindow/faults_simple.go b/provider/lpwindow/faults_simple.go new file mode 100644 index 000000000..b79a7dcf7 --- /dev/null +++ b/provider/lpwindow/faults_simple.go @@ -0,0 +1,149 @@ +package lpwindow + +import ( + "context" + "crypto/rand" + "fmt" + ffi "github.com/filecoin-project/filecoin-ffi" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/storage/paths" + "github.com/filecoin-project/lotus/storage/sealer/storiface" + "golang.org/x/xerrors" + "sync" + "time" +) + +type SimpleFaultTracker struct { + storage paths.Store + index paths.SectorIndex + + parallelCheckLimit int // todo live config? + singleCheckTimeout time.Duration + partitionCheckTimeout time.Duration +} + +func NewSimpleFaultTracker(storage paths.Store, index paths.SectorIndex, + parallelCheckLimit int, singleCheckTimeout time.Duration, partitionCheckTimeout time.Duration) *SimpleFaultTracker { + return &SimpleFaultTracker{ + storage: storage, + index: index, + + parallelCheckLimit: parallelCheckLimit, + singleCheckTimeout: singleCheckTimeout, + partitionCheckTimeout: partitionCheckTimeout, + } +} + +func (m *SimpleFaultTracker) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, sectors []storiface.SectorRef, rg storiface.RGetter) (map[abi.SectorID]string, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + if rg == nil { + return nil, xerrors.Errorf("rg is nil") + } + + var bad = make(map[abi.SectorID]string) + var badLk sync.Mutex + + var postRand abi.PoStRandomness = make([]byte, abi.RandomnessLength) + _, _ = rand.Read(postRand) + postRand[31] &= 0x3f + + limit := m.parallelCheckLimit + if limit <= 0 { + limit = len(sectors) + } + throttle := make(chan struct{}, limit) + + addBad := func(s abi.SectorID, reason string) { + badLk.Lock() + bad[s] = reason + badLk.Unlock() + } + + if m.partitionCheckTimeout > 0 { + var cancel2 context.CancelFunc + ctx, cancel2 = context.WithTimeout(ctx, m.partitionCheckTimeout) + defer cancel2() + } + + var wg sync.WaitGroup + wg.Add(len(sectors)) + + for _, sector := range sectors { + select { + case throttle <- struct{}{}: + case <-ctx.Done(): + addBad(sector.ID, fmt.Sprintf("waiting for check worker: %s", ctx.Err())) + wg.Done() + continue + } + + go func(sector storiface.SectorRef) { + defer wg.Done() + defer func() { + <-throttle + }() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + commr, update, err := rg(ctx, sector.ID) + if err != nil { + log.Warnw("CheckProvable Sector FAULT: getting commR", "sector", sector, "sealed", "err", err) + addBad(sector.ID, fmt.Sprintf("getting commR: %s", err)) + return + } + + toLock := storiface.FTSealed | storiface.FTCache + if update { + toLock = storiface.FTUpdate | storiface.FTUpdateCache + } + + locked, err := m.index.StorageTryLock(ctx, sector.ID, toLock, storiface.FTNone) + if err != nil { + addBad(sector.ID, fmt.Sprintf("tryLock error: %s", err)) + return + } + + if !locked { + log.Warnw("CheckProvable Sector FAULT: can't acquire read lock", "sector", sector) + addBad(sector.ID, fmt.Sprint("can't acquire read lock")) + return + } + + ch, err := ffi.GeneratePoStFallbackSectorChallenges(pp, sector.ID.Miner, postRand, []abi.SectorNumber{ + sector.ID.Number, + }) + if err != nil { + log.Warnw("CheckProvable Sector FAULT: generating challenges", "sector", sector, "err", err) + addBad(sector.ID, fmt.Sprintf("generating fallback challenges: %s", err)) + return + } + + vctx := ctx + + if m.singleCheckTimeout > 0 { + var cancel2 context.CancelFunc + vctx, cancel2 = context.WithTimeout(ctx, m.singleCheckTimeout) + defer cancel2() + } + + _, err = m.storage.GenerateSingleVanillaProof(vctx, sector.ID.Miner, storiface.PostSectorChallenge{ + SealProof: sector.ProofType, + SectorNumber: sector.ID.Number, + SealedCID: commr, + Challenge: ch.Challenges[sector.ID.Number], + Update: update, + }, pp) + if err != nil { + log.Warnw("CheckProvable Sector FAULT: generating vanilla proof", "sector", sector, "err", err) + addBad(sector.ID, fmt.Sprintf("generating vanilla proof: %s", err)) + return + } + }(sector) + } + + wg.Wait() + + return bad, nil +} diff --git a/provider/lpwindow/task.go b/provider/lpwindow/task.go index 6bdfa7d70..e3cfa017b 100644 --- a/provider/lpwindow/task.go +++ b/provider/lpwindow/task.go @@ -2,6 +2,10 @@ package lpwindow import ( "context" + "github.com/filecoin-project/go-bitfield" + "github.com/filecoin-project/go-state-types/crypto" + "github.com/filecoin-project/go-state-types/network" + "github.com/filecoin-project/lotus/storage/sealer" "sort" "time" @@ -45,12 +49,23 @@ type WDPoStAPI interface { StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error) ChainGetTipSetAfterHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error) StateMinerPartitions(context.Context, address.Address, uint64, types.TipSetKey) ([]api.Partition, error) + StateGetRandomnessFromBeacon(ctx context.Context, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte, tsk types.TipSetKey) (abi.Randomness, error) + StateNetworkVersion(context.Context, types.TipSetKey) (network.Version, error) + StateMinerSectors(context.Context, address.Address, *bitfield.BitField, types.TipSetKey) ([]*miner.SectorOnChainInfo, error) +} + +type ProverPoSt interface { + GenerateWindowPoStAdv(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []storiface.PostSectorChallenge, partitionIdx int, randomness abi.PoStRandomness, allowSkip bool) (storiface.WindowPoStResult, error) } type WdPostTask struct { api WDPoStAPI db *harmonydb.DB + faultTracker sealer.FaultTracker + prover ProverPoSt + verifier storiface.Verifier + windowPoStTF promise.Promise[harmonytask.AddTaskFunc] actors []dtypes.MinerAddress @@ -68,8 +83,6 @@ func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done time.Sleep(5 * time.Second) log.Errorf("WdPostTask.Do() called with taskID: %v", taskID) - var deadline dline.Info - var spID, pps, dlIdx, partIdx uint64 err = t.db.QueryRow(context.Background(), @@ -89,14 +102,34 @@ func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done return false, err } - wdpost.NewDeadlineInfo(abi.ChainEpoch(pps), dlIdx, head.Height()) + deadline := wdpost.NewDeadlineInfo(abi.ChainEpoch(pps), dlIdx, head.Height()) if deadline.PeriodElapsed() { log.Errorf("WdPost removed stale task: %v %v", taskID, deadline) return true, nil } - panic("todo") + maddr, err := address.NewIDAddress(spID) + if err != nil { + log.Errorf("WdPostTask.Do() failed to NewIDAddress: %v", err) + return false, err + } + + ts, err := t.api.ChainGetTipSetAfterHeight(context.Background(), deadline.Challenge, head.Key()) + if err != nil { + log.Errorf("WdPostTask.Do() failed to ChainGetTipSetAfterHeight: %v", err) + return false, err + } + + postOut, err := t.doPartition(context.Background(), ts, maddr, deadline, partIdx) + if err != nil { + log.Errorf("WdPostTask.Do() failed to doPartition: %v", err) + return false, err + } + + panic("todo record") + + _ = postOut /*submitWdPostParams, err := t.Scheduler.runPoStCycle(context.Background(), false, deadline, ts) if err != nil { @@ -307,11 +340,23 @@ func (t *WdPostTask) processHeadChange(ctx context.Context, revert, apply *types return nil } -func NewWdPostTask(db *harmonydb.DB, api WDPoStAPI, pcs *chainsched.ProviderChainSched, actors []dtypes.MinerAddress) (*WdPostTask, error) { +func NewWdPostTask(db *harmonydb.DB, + api WDPoStAPI, + faultTracker sealer.FaultTracker, + prover ProverPoSt, + verifier storiface.Verifier, + + pcs *chainsched.ProviderChainSched, + actors []dtypes.MinerAddress, +) (*WdPostTask, error) { t := &WdPostTask{ db: db, api: api, + faultTracker: faultTracker, + prover: prover, + verifier: verifier, + actors: actors, } diff --git a/storage/sealer/worker_local.go b/storage/sealer/worker_local.go index cc4a81599..65db921dc 100644 --- a/storage/sealer/worker_local.go +++ b/storage/sealer/worker_local.go @@ -647,6 +647,10 @@ func (l *LocalWorker) GenerateWinningPoSt(ctx context.Context, ppt abi.Registere } func (l *LocalWorker) GenerateWindowPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []storiface.PostSectorChallenge, partitionIdx int, randomness abi.PoStRandomness) (storiface.WindowPoStResult, error) { + return l.GenerateWindowPoStAdv(ctx, ppt, mid, sectors, partitionIdx, randomness, false) +} + +func (l *LocalWorker) GenerateWindowPoStAdv(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []storiface.PostSectorChallenge, partitionIdx int, randomness abi.PoStRandomness, allowSkip bool) (storiface.WindowPoStResult, error) { sb, err := l.executor() if err != nil { return storiface.WindowPoStResult{}, err @@ -658,7 +662,7 @@ func (l *LocalWorker) GenerateWindowPoSt(ctx context.Context, ppt abi.Registered var wg sync.WaitGroup wg.Add(len(sectors)) - vproofs := make([][]byte, len(sectors)) + vproofs := make([][]byte, 0, len(sectors)) for i, s := range sectors { if l.challengeThrottle != nil { @@ -696,12 +700,13 @@ func (l *LocalWorker) GenerateWindowPoSt(ctx context.Context, ppt abi.Registered return } - vproofs[i] = vanilla + //vproofs[i] = vanilla // todo substitutes?? + vproofs = append(vproofs, vanilla) }(i, s) } wg.Wait() - if len(skipped) > 0 { + if len(skipped) > 0 && !allowSkip { // This should happen rarely because before entering GenerateWindowPoSt we check all sectors by reading challenges. // When it does happen, window post runner logic will just re-check sectors, and retry with newly-discovered-bad sectors skipped log.Errorf("couldn't read some challenges (skipped %d)", len(skipped)) From 3d40105fc3610f85af8eb9f0cb5ad5e6e6f31de2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 27 Oct 2023 18:22:39 +0200 Subject: [PATCH 2/2] lpwindow: Fix build --- cmd/lotus-provider/run.go | 28 +++++++--------------------- provider/lpwindow/task.go | 3 +-- 2 files changed, 8 insertions(+), 23 deletions(-) diff --git a/cmd/lotus-provider/run.go b/cmd/lotus-provider/run.go index 24241eab1..6c3a191e8 100644 --- a/cmd/lotus-provider/run.go +++ b/cmd/lotus-provider/run.go @@ -10,7 +10,6 @@ import ( "github.com/gin-contrib/pprof" "github.com/gin-gonic/gin" - "github.com/ipfs/go-datastore/namespace" "github.com/pkg/errors" "github.com/urfave/cli/v2" "go.opencensus.io/stats" @@ -19,8 +18,6 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-jsonrpc/auth" - "github.com/filecoin-project/go-statestore" - "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" lcli "github.com/filecoin-project/lotus/cli" @@ -254,24 +251,13 @@ var runCmd = &cli.Command{ return err } + // todo fetch limit config stor := paths.NewRemote(lstor, si, http.Header(sa), 10, &paths.DefaultPartialFileHandler{}) - mds, err := lr.Datastore(ctx, "/metadata") - if err != nil { - return err - } - wsts := statestore.New(namespace.Wrap(mds, modules.WorkerCallsPrefix)) - smsts := statestore.New(namespace.Wrap(mds, modules.ManagerWorkPrefix)) - sealer, err := sealer.New(ctx, lstor, stor, lr, si, cfg.SealerConfig, config.ProvingConfig{}, wsts, smsts) - if err != nil { - return err - } - - //ds, dsCloser, err := modules.DatastoreV2(ctx, false, lr) - if err != nil { - return err - } - //defer dsCloser() + // todo localWorker isn't the abstraction layer we want to use here, we probably want to go straight to ffiwrapper + // maybe with a lotus-provider specific abstraction. LocalWorker does persistent call tracking which we probably + // don't need (ehh.. maybe we do, the async callback system may actually work decently well with harmonytask) + lw := sealer.NewLocalWorker(sealer.WorkerConfig{}, stor, localStore, si, nil, nil) var maddrs []dtypes.MinerAddress for _, s := range cfg.Addresses.MinerAddresses { @@ -283,8 +269,8 @@ var runCmd = &cli.Command{ } if cfg.Subsystems.EnableWindowPost { - wdPostTask, err := provider.WindowPostScheduler(ctx, cfg.Fees, cfg.Proving, full, sealer, verif, j, - as, maddrs, db, cfg.Subsystems.WindowPostMaxTasks) + wdPostTask, err := provider.WindowPostScheduler(ctx, cfg.Fees, cfg.Proving, full, verif, lw, + as, maddrs, db, stor, si) if err != nil { return err } diff --git a/provider/lpwindow/task.go b/provider/lpwindow/task.go index e3cfa017b..67d6c328c 100644 --- a/provider/lpwindow/task.go +++ b/provider/lpwindow/task.go @@ -375,8 +375,7 @@ func (t *WdPostTask) addTaskToDB(taskId harmonytask.TaskID, taskIdent wdTaskIden sp_id, proving_period_start, deadline_index, - partition_index, - + partition_index ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10 , $11, $12, $13, $14)`, taskId, taskIdent.Sp_id,