Merge branch 'feat/wdpost-adder2' into simpleharmony
This commit is contained in:
commit
a46faaa598
@ -10,8 +10,6 @@ import (
|
|||||||
|
|
||||||
"github.com/gin-contrib/pprof"
|
"github.com/gin-contrib/pprof"
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
"github.com/ipfs/go-datastore"
|
|
||||||
"github.com/ipfs/go-datastore/namespace"
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/urfave/cli/v2"
|
"github.com/urfave/cli/v2"
|
||||||
"go.opencensus.io/stats"
|
"go.opencensus.io/stats"
|
||||||
@ -19,8 +17,6 @@ import (
|
|||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
"github.com/filecoin-project/go-address"
|
||||||
"github.com/filecoin-project/go-jsonrpc/auth"
|
"github.com/filecoin-project/go-jsonrpc/auth"
|
||||||
"github.com/filecoin-project/go-statestore"
|
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
lcli "github.com/filecoin-project/lotus/cli"
|
lcli "github.com/filecoin-project/lotus/cli"
|
||||||
@ -213,15 +209,13 @@ var runCmd = &cli.Command{
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// todo fetch limit config
|
||||||
stor := paths.NewRemote(localStore, si, http.Header(sa), 10, &paths.DefaultPartialFileHandler{})
|
stor := paths.NewRemote(localStore, si, http.Header(sa), 10, &paths.DefaultPartialFileHandler{})
|
||||||
|
|
||||||
unusedDataStore := datastore.NewMapDatastore()
|
// todo localWorker isn't the abstraction layer we want to use here, we probably want to go straight to ffiwrapper
|
||||||
wsts := statestore.New(namespace.Wrap(unusedDataStore, modules.WorkerCallsPrefix))
|
// maybe with a lotus-provider specific abstraction. LocalWorker does persistent call tracking which we probably
|
||||||
smsts := statestore.New(namespace.Wrap(unusedDataStore, modules.ManagerWorkPrefix))
|
// don't need (ehh.. maybe we do, the async callback system may actually work decently well with harmonytask)
|
||||||
sealer, err := sealer.New(ctx, localStore, stor, bls, si, cfg.SealerConfig, config.ProvingConfig{}, wsts, smsts)
|
lw := sealer.NewLocalWorker(sealer.WorkerConfig{}, stor, localStore, si, nil, nil)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
var maddrs []dtypes.MinerAddress
|
var maddrs []dtypes.MinerAddress
|
||||||
for _, s := range cfg.Addresses.MinerAddresses {
|
for _, s := range cfg.Addresses.MinerAddresses {
|
||||||
@ -239,8 +233,8 @@ var runCmd = &cli.Command{
|
|||||||
{
|
{
|
||||||
|
|
||||||
if cfg.Subsystems.EnableWindowPost {
|
if cfg.Subsystems.EnableWindowPost {
|
||||||
wdPostTask, err := provider.WindowPostScheduler(ctx, cfg.Fees, cfg.Proving, full, sealer, verif, j,
|
wdPostTask, err := provider.WindowPostScheduler(ctx, cfg.Fees, cfg.Proving, full, verif, lw,
|
||||||
as, maddrs, db, cfg.Subsystems.WindowPostMaxTasks)
|
as, maddrs, db, stor, si)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
2
go.mod
2
go.mod
@ -134,6 +134,7 @@ require (
|
|||||||
github.com/multiformats/go-varint v0.0.7
|
github.com/multiformats/go-varint v0.0.7
|
||||||
github.com/open-rpc/meta-schema v0.0.0-20201029221707-1b72ef2ea333
|
github.com/open-rpc/meta-schema v0.0.0-20201029221707-1b72ef2ea333
|
||||||
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58
|
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58
|
||||||
|
github.com/pkg/errors v0.9.1
|
||||||
github.com/polydawn/refmt v0.89.0
|
github.com/polydawn/refmt v0.89.0
|
||||||
github.com/prometheus/client_golang v1.16.0
|
github.com/prometheus/client_golang v1.16.0
|
||||||
github.com/puzpuzpuz/xsync/v2 v2.4.0
|
github.com/puzpuzpuz/xsync/v2 v2.4.0
|
||||||
@ -311,7 +312,6 @@ require (
|
|||||||
github.com/opentracing/opentracing-go v1.2.0 // indirect
|
github.com/opentracing/opentracing-go v1.2.0 // indirect
|
||||||
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
|
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
|
||||||
github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9 // indirect
|
github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9 // indirect
|
||||||
github.com/pkg/errors v0.9.1 // indirect
|
|
||||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||||
github.com/prometheus/client_model v0.4.0 // indirect
|
github.com/prometheus/client_model v0.4.0 // indirect
|
||||||
github.com/prometheus/common v0.42.0 // indirect
|
github.com/prometheus/common v0.42.0 // indirect
|
||||||
|
@ -19,13 +19,14 @@ comment on column wdpost_partition_tasks.partition_index is 'partition index wit
|
|||||||
|
|
||||||
create table wdpost_proofs
|
create table wdpost_proofs
|
||||||
(
|
(
|
||||||
deadline bigint not null,
|
sp_id bigint not null,
|
||||||
partitions bytea not null,
|
deadline bigint not null,
|
||||||
proof_type bigint,
|
partition bigint not null,
|
||||||
proof_bytes bytea,
|
submit_at_epoch bigint not null,
|
||||||
chain_commit_epoch bigint,
|
submit_by_epoch bigint not null,
|
||||||
chain_commit_rand bytea
|
proof_message bytea
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -2,28 +2,32 @@ package provider
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"github.com/filecoin-project/lotus/storage/paths"
|
||||||
|
"github.com/filecoin-project/lotus/storage/sealer"
|
||||||
|
"time"
|
||||||
|
|
||||||
logging "github.com/ipfs/go-log/v2"
|
logging "github.com/ipfs/go-log/v2"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/journal"
|
|
||||||
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
|
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
|
||||||
"github.com/filecoin-project/lotus/node/config"
|
"github.com/filecoin-project/lotus/node/config"
|
||||||
dtypes "github.com/filecoin-project/lotus/node/modules/dtypes"
|
dtypes "github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
"github.com/filecoin-project/lotus/provider/chainsched"
|
"github.com/filecoin-project/lotus/provider/chainsched"
|
||||||
"github.com/filecoin-project/lotus/provider/lpwindow"
|
"github.com/filecoin-project/lotus/provider/lpwindow"
|
||||||
"github.com/filecoin-project/lotus/storage/ctladdr"
|
"github.com/filecoin-project/lotus/storage/ctladdr"
|
||||||
"github.com/filecoin-project/lotus/storage/sealer"
|
|
||||||
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||||
)
|
)
|
||||||
|
|
||||||
var log = logging.Logger("provider")
|
var log = logging.Logger("provider")
|
||||||
|
|
||||||
func WindowPostScheduler(ctx context.Context, fc config.LotusProviderFees, pc config.ProvingConfig,
|
func WindowPostScheduler(ctx context.Context, fc config.LotusProviderFees, pc config.ProvingConfig,
|
||||||
api api.FullNode, sealer sealer.SectorManager, verif storiface.Verifier, j journal.Journal,
|
api api.FullNode, verif storiface.Verifier, lw *sealer.LocalWorker,
|
||||||
as *ctladdr.AddressSelector, maddr []dtypes.MinerAddress, db *harmonydb.DB, max int) (*lpwindow.WdPostTask, error) {
|
as *ctladdr.AddressSelector, maddr []dtypes.MinerAddress, db *harmonydb.DB, stor paths.Store, idx paths.SectorIndex) (*lpwindow.WdPostTask, error) {
|
||||||
|
|
||||||
chainSched := chainsched.New(api)
|
chainSched := chainsched.New(api)
|
||||||
|
|
||||||
return lpwindow.NewWdPostTask(db, nil, chainSched, maddr)
|
// todo config
|
||||||
|
ft := lpwindow.NewSimpleFaultTracker(stor, idx, 32, 5*time.Second, 300*time.Second)
|
||||||
|
|
||||||
|
return lpwindow.NewWdPostTask(db, api, ft, lw, verif, chainSched, maddr)
|
||||||
}
|
}
|
||||||
|
431
provider/lpwindow/do.go
Normal file
431
provider/lpwindow/do.go
Normal file
@ -0,0 +1,431 @@
|
|||||||
|
package lpwindow
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
ffi "github.com/filecoin-project/filecoin-ffi"
|
||||||
|
"github.com/filecoin-project/go-address"
|
||||||
|
"github.com/filecoin-project/go-bitfield"
|
||||||
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
|
"github.com/filecoin-project/go-state-types/builtin"
|
||||||
|
miner2 "github.com/filecoin-project/go-state-types/builtin/v9/miner"
|
||||||
|
"github.com/filecoin-project/go-state-types/crypto"
|
||||||
|
"github.com/filecoin-project/go-state-types/dline"
|
||||||
|
"github.com/filecoin-project/go-state-types/proof"
|
||||||
|
"github.com/filecoin-project/lotus/build"
|
||||||
|
types "github.com/filecoin-project/lotus/chain/types"
|
||||||
|
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||||
|
proof7 "github.com/filecoin-project/specs-actors/v7/actors/runtime/proof"
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
"go.uber.org/multierr"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
"sort"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const disablePreChecks = false // todo config
|
||||||
|
|
||||||
|
func (t *WdPostTask) doPartition(ctx context.Context, ts *types.TipSet, maddr address.Address, di *dline.Info, partIdx uint64) (out *miner2.SubmitWindowedPoStParams, err error) {
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
log.Errorf("recover: %s", r)
|
||||||
|
err = xerrors.Errorf("panic in doPartition: %s", r)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
buf := new(bytes.Buffer)
|
||||||
|
if err := maddr.MarshalCBOR(buf); err != nil {
|
||||||
|
return nil, xerrors.Errorf("failed to marshal address to cbor: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
headTs, err := t.api.ChainHead(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("getting current head: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
rand, err := t.api.StateGetRandomnessFromBeacon(ctx, crypto.DomainSeparationTag_WindowedPoStChallengeSeed, di.Challenge, buf.Bytes(), headTs.Key())
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("failed to get chain randomness from beacon for window post (ts=%d; deadline=%d): %w", ts.Height(), di, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
parts, err := t.api.StateMinerPartitions(ctx, maddr, di.Index, ts.Key())
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("getting partitions: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if partIdx >= uint64(len(parts)) {
|
||||||
|
return nil, xerrors.Errorf("invalid partIdx %d (deadline has %d partitions)", partIdx, len(parts))
|
||||||
|
}
|
||||||
|
|
||||||
|
partition := parts[partIdx]
|
||||||
|
|
||||||
|
params := miner2.SubmitWindowedPoStParams{
|
||||||
|
Deadline: di.Index,
|
||||||
|
Partitions: make([]miner2.PoStPartition, 0, 1),
|
||||||
|
Proofs: nil,
|
||||||
|
}
|
||||||
|
|
||||||
|
var partitions []miner2.PoStPartition
|
||||||
|
var xsinfos []proof7.ExtendedSectorInfo
|
||||||
|
|
||||||
|
{
|
||||||
|
toProve, err := bitfield.SubtractBitField(partition.LiveSectors, partition.FaultySectors)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("removing faults from set of sectors to prove: %w", err)
|
||||||
|
}
|
||||||
|
/*if manual {
|
||||||
|
// this is a check run, we want to prove faulty sectors, even
|
||||||
|
// if they are not declared as recovering.
|
||||||
|
toProve = partition.LiveSectors
|
||||||
|
}*/
|
||||||
|
toProve, err = bitfield.MergeBitFields(toProve, partition.RecoveringSectors)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("adding recoveries to set of sectors to prove: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
good, err := toProve.Copy()
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("copy toProve: %w", err)
|
||||||
|
}
|
||||||
|
if !disablePreChecks {
|
||||||
|
good, err = t.checkSectors(ctx, maddr, toProve, ts.Key())
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("checking sectors to skip: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*good, err = bitfield.SubtractBitField(good, postSkipped)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("toProve - postSkipped: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
post skipped is legacy retry mechanism, shouldn't be needed anymore
|
||||||
|
*/
|
||||||
|
|
||||||
|
skipped, err := bitfield.SubtractBitField(toProve, good)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("toProve - good: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
sc, err := skipped.Count()
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("getting skipped sector count: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
skipCount := sc
|
||||||
|
|
||||||
|
ssi, err := t.sectorsForProof(ctx, maddr, good, partition.AllSectors, ts)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("getting sorted sector info: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(ssi) == 0 {
|
||||||
|
return nil, xerrors.Errorf("no sectors to prove")
|
||||||
|
}
|
||||||
|
|
||||||
|
xsinfos = append(xsinfos, ssi...)
|
||||||
|
partitions = append(partitions, miner2.PoStPartition{
|
||||||
|
Index: partIdx,
|
||||||
|
Skipped: skipped,
|
||||||
|
})
|
||||||
|
|
||||||
|
log.Infow("running window post",
|
||||||
|
"chain-random", rand,
|
||||||
|
"deadline", di,
|
||||||
|
"height", ts.Height(),
|
||||||
|
"skipped", skipCount)
|
||||||
|
|
||||||
|
tsStart := build.Clock.Now()
|
||||||
|
|
||||||
|
mid, err := address.IDFromAddress(maddr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
nv, err := t.api.StateNetworkVersion(ctx, ts.Key())
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("getting network version: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ppt, err := xsinfos[0].SealProof.RegisteredWindowPoStProofByNetworkVersion(nv)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("failed to get window post type: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
postOut, ps, err := t.generateWindowPoSt(ctx, ppt, abi.ActorID(mid), xsinfos, append(abi.PoStRandomness{}, rand...))
|
||||||
|
elapsed := time.Since(tsStart)
|
||||||
|
log.Infow("computing window post", "partition", partIdx, "elapsed", elapsed, "skip", len(ps), "err", err)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("error generating window post: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
// If we proved nothing, something is very wrong.
|
||||||
|
if len(postOut) == 0 {
|
||||||
|
log.Errorf("len(postOut) == 0")
|
||||||
|
return nil, xerrors.Errorf("received no proofs back from generate window post")
|
||||||
|
}
|
||||||
|
|
||||||
|
headTs, err := t.api.ChainHead(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("getting current head: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
checkRand, err := t.api.StateGetRandomnessFromBeacon(ctx, crypto.DomainSeparationTag_WindowedPoStChallengeSeed, di.Challenge, buf.Bytes(), headTs.Key())
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("failed to get chain randomness from beacon for window post (ts=%d; deadline=%d): %w", ts.Height(), di, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !bytes.Equal(checkRand, rand) {
|
||||||
|
// this is a check from legacy code, there it would retry with new randomness.
|
||||||
|
// here we don't retry because the current network version uses beacon randomness
|
||||||
|
// which should never change. We do keep this check tho to detect potential issues.
|
||||||
|
return nil, xerrors.Errorf("post generation randomness was different from random beacon")
|
||||||
|
}
|
||||||
|
|
||||||
|
sinfos := make([]proof7.SectorInfo, len(xsinfos))
|
||||||
|
for i, xsi := range xsinfos {
|
||||||
|
sinfos[i] = proof7.SectorInfo{
|
||||||
|
SealProof: xsi.SealProof,
|
||||||
|
SectorNumber: xsi.SectorNumber,
|
||||||
|
SealedCID: xsi.SealedCID,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if correct, err := t.verifier.VerifyWindowPoSt(ctx, proof.WindowPoStVerifyInfo{
|
||||||
|
Randomness: abi.PoStRandomness(checkRand),
|
||||||
|
Proofs: postOut,
|
||||||
|
ChallengedSectors: sinfos,
|
||||||
|
Prover: abi.ActorID(mid),
|
||||||
|
}); err != nil {
|
||||||
|
/*log.Errorw("window post verification failed", "post", postOut, "error", err)
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
continue todo retry loop */
|
||||||
|
} else if !correct {
|
||||||
|
/*log.Errorw("generated incorrect window post proof", "post", postOut, "error", err)
|
||||||
|
continue todo retry loop */
|
||||||
|
}
|
||||||
|
|
||||||
|
// Proof generation successful, stop retrying
|
||||||
|
//somethingToProve = true
|
||||||
|
params.Partitions = partitions
|
||||||
|
params.Proofs = postOut
|
||||||
|
//break
|
||||||
|
|
||||||
|
return ¶ms, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, xerrors.Errorf("failed to generate window post")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *WdPostTask) checkSectors(ctx context.Context, maddr address.Address, check bitfield.BitField, tsk types.TipSetKey) (bitfield.BitField, error) {
|
||||||
|
mid, err := address.IDFromAddress(maddr)
|
||||||
|
if err != nil {
|
||||||
|
return bitfield.BitField{}, xerrors.Errorf("failed to convert to ID addr: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
sectorInfos, err := t.api.StateMinerSectors(ctx, maddr, &check, tsk)
|
||||||
|
if err != nil {
|
||||||
|
return bitfield.BitField{}, xerrors.Errorf("failed to get sector infos: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
type checkSector struct {
|
||||||
|
sealed cid.Cid
|
||||||
|
update bool
|
||||||
|
}
|
||||||
|
|
||||||
|
sectors := make(map[abi.SectorNumber]checkSector)
|
||||||
|
var tocheck []storiface.SectorRef
|
||||||
|
for _, info := range sectorInfos {
|
||||||
|
sectors[info.SectorNumber] = checkSector{
|
||||||
|
sealed: info.SealedCID,
|
||||||
|
update: info.SectorKeyCID != nil,
|
||||||
|
}
|
||||||
|
tocheck = append(tocheck, storiface.SectorRef{
|
||||||
|
ProofType: info.SealProof,
|
||||||
|
ID: abi.SectorID{
|
||||||
|
Miner: abi.ActorID(mid),
|
||||||
|
Number: info.SectorNumber,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(tocheck) == 0 {
|
||||||
|
return bitfield.BitField{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
pp, err := tocheck[0].ProofType.RegisteredWindowPoStProof()
|
||||||
|
if err != nil {
|
||||||
|
return bitfield.BitField{}, xerrors.Errorf("failed to get window PoSt proof: %w", err)
|
||||||
|
}
|
||||||
|
pp, err = pp.ToV1_1PostProof()
|
||||||
|
if err != nil {
|
||||||
|
return bitfield.BitField{}, xerrors.Errorf("failed to convert to v1_1 post proof: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
bad, err := t.faultTracker.CheckProvable(ctx, pp, tocheck, func(ctx context.Context, id abi.SectorID) (cid.Cid, bool, error) {
|
||||||
|
s, ok := sectors[id.Number]
|
||||||
|
if !ok {
|
||||||
|
return cid.Undef, false, xerrors.Errorf("sealed CID not found")
|
||||||
|
}
|
||||||
|
return s.sealed, s.update, nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return bitfield.BitField{}, xerrors.Errorf("checking provable sectors: %w", err)
|
||||||
|
}
|
||||||
|
for id := range bad {
|
||||||
|
delete(sectors, id.Number)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Warnw("Checked sectors", "checked", len(tocheck), "good", len(sectors))
|
||||||
|
|
||||||
|
sbf := bitfield.New()
|
||||||
|
for s := range sectors {
|
||||||
|
sbf.Set(uint64(s))
|
||||||
|
}
|
||||||
|
|
||||||
|
return sbf, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *WdPostTask) sectorsForProof(ctx context.Context, maddr address.Address, goodSectors, allSectors bitfield.BitField, ts *types.TipSet) ([]proof7.ExtendedSectorInfo, error) {
|
||||||
|
sset, err := t.api.StateMinerSectors(ctx, maddr, &goodSectors, ts.Key())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(sset) == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
sectorByID := make(map[uint64]proof7.ExtendedSectorInfo, len(sset))
|
||||||
|
for _, sector := range sset {
|
||||||
|
sectorByID[uint64(sector.SectorNumber)] = proof7.ExtendedSectorInfo{
|
||||||
|
SectorNumber: sector.SectorNumber,
|
||||||
|
SealedCID: sector.SealedCID,
|
||||||
|
SealProof: sector.SealProof,
|
||||||
|
SectorKey: sector.SectorKeyCID,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
proofSectors := make([]proof7.ExtendedSectorInfo, 0, len(sset))
|
||||||
|
if err := allSectors.ForEach(func(sectorNo uint64) error {
|
||||||
|
if info, found := sectorByID[sectorNo]; found {
|
||||||
|
proofSectors = append(proofSectors, info)
|
||||||
|
} else {
|
||||||
|
//skip
|
||||||
|
// todo: testing: old logic used to put 'substitute' sectors here
|
||||||
|
// that probably isn't needed post nv19, but we do need to check that
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}); err != nil {
|
||||||
|
return nil, xerrors.Errorf("iterating partition sector bitmap: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return proofSectors, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *WdPostTask) generateWindowPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, minerID abi.ActorID, sectorInfo []proof.ExtendedSectorInfo, randomness abi.PoStRandomness) ([]proof.PoStProof, []abi.SectorID, error) {
|
||||||
|
var retErr error = nil
|
||||||
|
randomness[31] &= 0x3f
|
||||||
|
|
||||||
|
out := make([]proof.PoStProof, 0)
|
||||||
|
|
||||||
|
if len(sectorInfo) == 0 {
|
||||||
|
return nil, nil, xerrors.New("generate window post len(sectorInfo)=0")
|
||||||
|
}
|
||||||
|
|
||||||
|
maxPartitionSize, err := builtin.PoStProofWindowPoStPartitionSectors(ppt) // todo proxy through chain/actors
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, xerrors.Errorf("get sectors count of partition failed:%+v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The partitions number of this batch
|
||||||
|
// ceil(sectorInfos / maxPartitionSize)
|
||||||
|
partitionCount := uint64((len(sectorInfo) + int(maxPartitionSize) - 1) / int(maxPartitionSize))
|
||||||
|
if partitionCount > 1 {
|
||||||
|
return nil, nil, xerrors.Errorf("generateWindowPoSt partitionCount:%d, only support 1", partitionCount)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("generateWindowPoSt maxPartitionSize:%d partitionCount:%d", maxPartitionSize, partitionCount)
|
||||||
|
|
||||||
|
var skipped []abi.SectorID
|
||||||
|
var flk sync.Mutex
|
||||||
|
cctx, cancel := context.WithCancel(ctx)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
sort.Slice(sectorInfo, func(i, j int) bool {
|
||||||
|
return sectorInfo[i].SectorNumber < sectorInfo[j].SectorNumber
|
||||||
|
})
|
||||||
|
|
||||||
|
sectorNums := make([]abi.SectorNumber, len(sectorInfo))
|
||||||
|
sectorMap := make(map[abi.SectorNumber]proof.ExtendedSectorInfo)
|
||||||
|
for i, s := range sectorInfo {
|
||||||
|
sectorNums[i] = s.SectorNumber
|
||||||
|
sectorMap[s.SectorNumber] = s
|
||||||
|
}
|
||||||
|
|
||||||
|
postChallenges, err := ffi.GeneratePoStFallbackSectorChallenges(ppt, minerID, randomness, sectorNums)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, xerrors.Errorf("generating fallback challenges: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
proofList := make([]ffi.PartitionProof, partitionCount)
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(int(partitionCount))
|
||||||
|
|
||||||
|
for partIdx := uint64(0); partIdx < partitionCount; partIdx++ {
|
||||||
|
go func(partIdx uint64) {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
sectors := make([]storiface.PostSectorChallenge, 0)
|
||||||
|
for i := uint64(0); i < maxPartitionSize; i++ {
|
||||||
|
si := i + partIdx*maxPartitionSize
|
||||||
|
if si >= uint64(len(postChallenges.Sectors)) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
snum := postChallenges.Sectors[si]
|
||||||
|
sinfo := sectorMap[snum]
|
||||||
|
|
||||||
|
sectors = append(sectors, storiface.PostSectorChallenge{
|
||||||
|
SealProof: sinfo.SealProof,
|
||||||
|
SectorNumber: snum,
|
||||||
|
SealedCID: sinfo.SealedCID,
|
||||||
|
Challenge: postChallenges.Challenges[snum],
|
||||||
|
Update: sinfo.SectorKey != nil,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pr, err := t.prover.GenerateWindowPoStAdv(cctx, ppt, minerID, sectors, int(partIdx), randomness, true)
|
||||||
|
sk := pr.Skipped
|
||||||
|
|
||||||
|
if err != nil || len(sk) > 0 {
|
||||||
|
log.Errorf("generateWindowPost part:%d, skipped:%d, sectors: %d, err: %+v", partIdx, len(sk), len(sectors), err)
|
||||||
|
flk.Lock()
|
||||||
|
skipped = append(skipped, sk...)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
retErr = multierr.Append(retErr, xerrors.Errorf("partitionIndex:%d err:%+v", partIdx, err))
|
||||||
|
}
|
||||||
|
flk.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
proofList[partIdx] = ffi.PartitionProof(pr.PoStProofs)
|
||||||
|
}(partIdx)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
/* if len(skipped) > 0 {
|
||||||
|
return nil, skipped, multierr.Append(xerrors.Errorf("some sectors (%d) were skipped", len(skipped)), retErr)
|
||||||
|
}*/
|
||||||
|
|
||||||
|
postProofs, err := ffi.MergeWindowPoStPartitionProofs(ppt, proofList)
|
||||||
|
if err != nil {
|
||||||
|
return nil, skipped, xerrors.Errorf("merge windowPoSt partition proofs: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
out = append(out, *postProofs)
|
||||||
|
return out, skipped, retErr
|
||||||
|
}
|
149
provider/lpwindow/faults_simple.go
Normal file
149
provider/lpwindow/faults_simple.go
Normal file
@ -0,0 +1,149 @@
|
|||||||
|
package lpwindow
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/rand"
|
||||||
|
"fmt"
|
||||||
|
ffi "github.com/filecoin-project/filecoin-ffi"
|
||||||
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
|
"github.com/filecoin-project/lotus/storage/paths"
|
||||||
|
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type SimpleFaultTracker struct {
|
||||||
|
storage paths.Store
|
||||||
|
index paths.SectorIndex
|
||||||
|
|
||||||
|
parallelCheckLimit int // todo live config?
|
||||||
|
singleCheckTimeout time.Duration
|
||||||
|
partitionCheckTimeout time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSimpleFaultTracker(storage paths.Store, index paths.SectorIndex,
|
||||||
|
parallelCheckLimit int, singleCheckTimeout time.Duration, partitionCheckTimeout time.Duration) *SimpleFaultTracker {
|
||||||
|
return &SimpleFaultTracker{
|
||||||
|
storage: storage,
|
||||||
|
index: index,
|
||||||
|
|
||||||
|
parallelCheckLimit: parallelCheckLimit,
|
||||||
|
singleCheckTimeout: singleCheckTimeout,
|
||||||
|
partitionCheckTimeout: partitionCheckTimeout,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *SimpleFaultTracker) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, sectors []storiface.SectorRef, rg storiface.RGetter) (map[abi.SectorID]string, error) {
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
if rg == nil {
|
||||||
|
return nil, xerrors.Errorf("rg is nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
var bad = make(map[abi.SectorID]string)
|
||||||
|
var badLk sync.Mutex
|
||||||
|
|
||||||
|
var postRand abi.PoStRandomness = make([]byte, abi.RandomnessLength)
|
||||||
|
_, _ = rand.Read(postRand)
|
||||||
|
postRand[31] &= 0x3f
|
||||||
|
|
||||||
|
limit := m.parallelCheckLimit
|
||||||
|
if limit <= 0 {
|
||||||
|
limit = len(sectors)
|
||||||
|
}
|
||||||
|
throttle := make(chan struct{}, limit)
|
||||||
|
|
||||||
|
addBad := func(s abi.SectorID, reason string) {
|
||||||
|
badLk.Lock()
|
||||||
|
bad[s] = reason
|
||||||
|
badLk.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
if m.partitionCheckTimeout > 0 {
|
||||||
|
var cancel2 context.CancelFunc
|
||||||
|
ctx, cancel2 = context.WithTimeout(ctx, m.partitionCheckTimeout)
|
||||||
|
defer cancel2()
|
||||||
|
}
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(len(sectors))
|
||||||
|
|
||||||
|
for _, sector := range sectors {
|
||||||
|
select {
|
||||||
|
case throttle <- struct{}{}:
|
||||||
|
case <-ctx.Done():
|
||||||
|
addBad(sector.ID, fmt.Sprintf("waiting for check worker: %s", ctx.Err()))
|
||||||
|
wg.Done()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
go func(sector storiface.SectorRef) {
|
||||||
|
defer wg.Done()
|
||||||
|
defer func() {
|
||||||
|
<-throttle
|
||||||
|
}()
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
commr, update, err := rg(ctx, sector.ID)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnw("CheckProvable Sector FAULT: getting commR", "sector", sector, "sealed", "err", err)
|
||||||
|
addBad(sector.ID, fmt.Sprintf("getting commR: %s", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
toLock := storiface.FTSealed | storiface.FTCache
|
||||||
|
if update {
|
||||||
|
toLock = storiface.FTUpdate | storiface.FTUpdateCache
|
||||||
|
}
|
||||||
|
|
||||||
|
locked, err := m.index.StorageTryLock(ctx, sector.ID, toLock, storiface.FTNone)
|
||||||
|
if err != nil {
|
||||||
|
addBad(sector.ID, fmt.Sprintf("tryLock error: %s", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if !locked {
|
||||||
|
log.Warnw("CheckProvable Sector FAULT: can't acquire read lock", "sector", sector)
|
||||||
|
addBad(sector.ID, fmt.Sprint("can't acquire read lock"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ch, err := ffi.GeneratePoStFallbackSectorChallenges(pp, sector.ID.Miner, postRand, []abi.SectorNumber{
|
||||||
|
sector.ID.Number,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Warnw("CheckProvable Sector FAULT: generating challenges", "sector", sector, "err", err)
|
||||||
|
addBad(sector.ID, fmt.Sprintf("generating fallback challenges: %s", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
vctx := ctx
|
||||||
|
|
||||||
|
if m.singleCheckTimeout > 0 {
|
||||||
|
var cancel2 context.CancelFunc
|
||||||
|
vctx, cancel2 = context.WithTimeout(ctx, m.singleCheckTimeout)
|
||||||
|
defer cancel2()
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = m.storage.GenerateSingleVanillaProof(vctx, sector.ID.Miner, storiface.PostSectorChallenge{
|
||||||
|
SealProof: sector.ProofType,
|
||||||
|
SectorNumber: sector.ID.Number,
|
||||||
|
SealedCID: commr,
|
||||||
|
Challenge: ch.Challenges[sector.ID.Number],
|
||||||
|
Update: update,
|
||||||
|
}, pp)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnw("CheckProvable Sector FAULT: generating vanilla proof", "sector", sector, "err", err)
|
||||||
|
addBad(sector.ID, fmt.Sprintf("generating vanilla proof: %s", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}(sector)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
return bad, nil
|
||||||
|
}
|
@ -2,6 +2,10 @@ package lpwindow
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"github.com/filecoin-project/go-bitfield"
|
||||||
|
"github.com/filecoin-project/go-state-types/crypto"
|
||||||
|
"github.com/filecoin-project/go-state-types/network"
|
||||||
|
"github.com/filecoin-project/lotus/storage/sealer"
|
||||||
"sort"
|
"sort"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -45,12 +49,23 @@ type WDPoStAPI interface {
|
|||||||
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error)
|
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error)
|
||||||
ChainGetTipSetAfterHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)
|
ChainGetTipSetAfterHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)
|
||||||
StateMinerPartitions(context.Context, address.Address, uint64, types.TipSetKey) ([]api.Partition, error)
|
StateMinerPartitions(context.Context, address.Address, uint64, types.TipSetKey) ([]api.Partition, error)
|
||||||
|
StateGetRandomnessFromBeacon(ctx context.Context, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte, tsk types.TipSetKey) (abi.Randomness, error)
|
||||||
|
StateNetworkVersion(context.Context, types.TipSetKey) (network.Version, error)
|
||||||
|
StateMinerSectors(context.Context, address.Address, *bitfield.BitField, types.TipSetKey) ([]*miner.SectorOnChainInfo, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type ProverPoSt interface {
|
||||||
|
GenerateWindowPoStAdv(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []storiface.PostSectorChallenge, partitionIdx int, randomness abi.PoStRandomness, allowSkip bool) (storiface.WindowPoStResult, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type WdPostTask struct {
|
type WdPostTask struct {
|
||||||
api WDPoStAPI
|
api WDPoStAPI
|
||||||
db *harmonydb.DB
|
db *harmonydb.DB
|
||||||
|
|
||||||
|
faultTracker sealer.FaultTracker
|
||||||
|
prover ProverPoSt
|
||||||
|
verifier storiface.Verifier
|
||||||
|
|
||||||
windowPoStTF promise.Promise[harmonytask.AddTaskFunc]
|
windowPoStTF promise.Promise[harmonytask.AddTaskFunc]
|
||||||
|
|
||||||
actors []dtypes.MinerAddress
|
actors []dtypes.MinerAddress
|
||||||
@ -68,8 +83,6 @@ func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
|
|||||||
time.Sleep(5 * time.Second)
|
time.Sleep(5 * time.Second)
|
||||||
log.Errorf("WdPostTask.Do() called with taskID: %v", taskID)
|
log.Errorf("WdPostTask.Do() called with taskID: %v", taskID)
|
||||||
|
|
||||||
var deadline dline.Info
|
|
||||||
|
|
||||||
var spID, pps, dlIdx, partIdx uint64
|
var spID, pps, dlIdx, partIdx uint64
|
||||||
|
|
||||||
err = t.db.QueryRow(context.Background(),
|
err = t.db.QueryRow(context.Background(),
|
||||||
@ -89,14 +102,34 @@ func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
|
|||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
wdpost.NewDeadlineInfo(abi.ChainEpoch(pps), dlIdx, head.Height())
|
deadline := wdpost.NewDeadlineInfo(abi.ChainEpoch(pps), dlIdx, head.Height())
|
||||||
|
|
||||||
if deadline.PeriodElapsed() {
|
if deadline.PeriodElapsed() {
|
||||||
log.Errorf("WdPost removed stale task: %v %v", taskID, deadline)
|
log.Errorf("WdPost removed stale task: %v %v", taskID, deadline)
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
panic("todo")
|
maddr, err := address.NewIDAddress(spID)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("WdPostTask.Do() failed to NewIDAddress: %v", err)
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
ts, err := t.api.ChainGetTipSetAfterHeight(context.Background(), deadline.Challenge, head.Key())
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("WdPostTask.Do() failed to ChainGetTipSetAfterHeight: %v", err)
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
postOut, err := t.doPartition(context.Background(), ts, maddr, deadline, partIdx)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("WdPostTask.Do() failed to doPartition: %v", err)
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
panic("todo record")
|
||||||
|
|
||||||
|
_ = postOut
|
||||||
|
|
||||||
/*submitWdPostParams, err := t.Scheduler.runPoStCycle(context.Background(), false, deadline, ts)
|
/*submitWdPostParams, err := t.Scheduler.runPoStCycle(context.Background(), false, deadline, ts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -307,11 +340,23 @@ func (t *WdPostTask) processHeadChange(ctx context.Context, revert, apply *types
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewWdPostTask(db *harmonydb.DB, api WDPoStAPI, pcs *chainsched.ProviderChainSched, actors []dtypes.MinerAddress) (*WdPostTask, error) {
|
func NewWdPostTask(db *harmonydb.DB,
|
||||||
|
api WDPoStAPI,
|
||||||
|
faultTracker sealer.FaultTracker,
|
||||||
|
prover ProverPoSt,
|
||||||
|
verifier storiface.Verifier,
|
||||||
|
|
||||||
|
pcs *chainsched.ProviderChainSched,
|
||||||
|
actors []dtypes.MinerAddress,
|
||||||
|
) (*WdPostTask, error) {
|
||||||
t := &WdPostTask{
|
t := &WdPostTask{
|
||||||
db: db,
|
db: db,
|
||||||
api: api,
|
api: api,
|
||||||
|
|
||||||
|
faultTracker: faultTracker,
|
||||||
|
prover: prover,
|
||||||
|
verifier: verifier,
|
||||||
|
|
||||||
actors: actors,
|
actors: actors,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -330,8 +375,7 @@ func (t *WdPostTask) addTaskToDB(taskId harmonytask.TaskID, taskIdent wdTaskIden
|
|||||||
sp_id,
|
sp_id,
|
||||||
proving_period_start,
|
proving_period_start,
|
||||||
deadline_index,
|
deadline_index,
|
||||||
partition_index,
|
partition_index
|
||||||
|
|
||||||
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10 , $11, $12, $13, $14)`,
|
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10 , $11, $12, $13, $14)`,
|
||||||
taskId,
|
taskId,
|
||||||
taskIdent.Sp_id,
|
taskIdent.Sp_id,
|
||||||
|
@ -647,6 +647,10 @@ func (l *LocalWorker) GenerateWinningPoSt(ctx context.Context, ppt abi.Registere
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (l *LocalWorker) GenerateWindowPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []storiface.PostSectorChallenge, partitionIdx int, randomness abi.PoStRandomness) (storiface.WindowPoStResult, error) {
|
func (l *LocalWorker) GenerateWindowPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []storiface.PostSectorChallenge, partitionIdx int, randomness abi.PoStRandomness) (storiface.WindowPoStResult, error) {
|
||||||
|
return l.GenerateWindowPoStAdv(ctx, ppt, mid, sectors, partitionIdx, randomness, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *LocalWorker) GenerateWindowPoStAdv(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []storiface.PostSectorChallenge, partitionIdx int, randomness abi.PoStRandomness, allowSkip bool) (storiface.WindowPoStResult, error) {
|
||||||
sb, err := l.executor()
|
sb, err := l.executor()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return storiface.WindowPoStResult{}, err
|
return storiface.WindowPoStResult{}, err
|
||||||
@ -658,7 +662,7 @@ func (l *LocalWorker) GenerateWindowPoSt(ctx context.Context, ppt abi.Registered
|
|||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(len(sectors))
|
wg.Add(len(sectors))
|
||||||
|
|
||||||
vproofs := make([][]byte, len(sectors))
|
vproofs := make([][]byte, 0, len(sectors))
|
||||||
|
|
||||||
for i, s := range sectors {
|
for i, s := range sectors {
|
||||||
if l.challengeThrottle != nil {
|
if l.challengeThrottle != nil {
|
||||||
@ -696,12 +700,13 @@ func (l *LocalWorker) GenerateWindowPoSt(ctx context.Context, ppt abi.Registered
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
vproofs[i] = vanilla
|
//vproofs[i] = vanilla // todo substitutes??
|
||||||
|
vproofs = append(vproofs, vanilla)
|
||||||
}(i, s)
|
}(i, s)
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
if len(skipped) > 0 {
|
if len(skipped) > 0 && !allowSkip {
|
||||||
// This should happen rarely because before entering GenerateWindowPoSt we check all sectors by reading challenges.
|
// This should happen rarely because before entering GenerateWindowPoSt we check all sectors by reading challenges.
|
||||||
// When it does happen, window post runner logic will just re-check sectors, and retry with newly-discovered-bad sectors skipped
|
// When it does happen, window post runner logic will just re-check sectors, and retry with newly-discovered-bad sectors skipped
|
||||||
log.Errorf("couldn't read some challenges (skipped %d)", len(skipped))
|
log.Errorf("couldn't read some challenges (skipped %d)", len(skipped))
|
||||||
|
Loading…
Reference in New Issue
Block a user