diff --git a/Makefile b/Makefile index 901c8dc00..11e839e5f 100644 --- a/Makefile +++ b/Makefile @@ -99,7 +99,10 @@ BINS+=lotus-miner curio: $(BUILD_DEPS) rm -f curio - $(GOCC) build $(GOFLAGS) -o curio ./cmd/curio + $(GOCC) build $(GOFLAGS) -o curio -ldflags " \ + -X github.com/filecoin-project/lotus/curiosrc/build.IsOpencl=$(FFI_USE_OPENCL) \ + -X github.com/filecoin-project/lotus/curiosrc/build.Commit=`git log -1 --format=%h_%cI`" \ + ./cmd/curio .PHONY: curio BINS+=curio diff --git a/cmd/curio/deps/deps.go b/cmd/curio/deps/deps.go index 56ad5d094..c9b7b315f 100644 --- a/cmd/curio/deps/deps.go +++ b/cmd/curio/deps/deps.go @@ -18,8 +18,6 @@ import ( "github.com/BurntSushi/toml" "github.com/gbrlsnchs/jwt/v3" - ds "github.com/ipfs/go-datastore" - dssync "github.com/ipfs/go-datastore/sync" logging "github.com/ipfs/go-log/v2" "github.com/samber/lo" "github.com/urfave/cli/v2" @@ -28,7 +26,6 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-jsonrpc/auth" "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/go-statestore" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api/v1api" @@ -170,7 +167,6 @@ type Deps struct { DB *harmonydb.DB // has itest capability Full api.FullNode Verif storiface.Verifier - LW *sealer.LocalWorker As *multictladdr.MultiAddressSelector Maddrs map[dtypes.MinerAddress]bool ProofTypes map[abi.RegisteredSealProof]bool @@ -311,16 +307,7 @@ Get it with: jq .PrivateKey ~/.lotus-miner/keystore/MF2XI2BNNJ3XILLQOJUXMYLUMU`, if deps.Stor == nil { deps.Stor = paths.NewRemote(deps.LocalStore, deps.Si, http.Header(sa), 10, &paths.DefaultPartialFileHandler{}) } - if deps.LW == nil { - wstates := statestore.New(dssync.MutexWrap(ds.NewMapDatastore())) - // todo localWorker isn't the abstraction layer we want to use here, we probably want to go straight to ffiwrapper - // maybe with a curio specific abstraction. LocalWorker does persistent call tracking which we probably - // don't need (ehh.. maybe we do, the async callback system may actually work decently well with harmonytask) - deps.LW = sealer.NewLocalWorker(sealer.WorkerConfig{ - MaxParallelChallengeReads: deps.Cfg.Proving.ParallelCheckLimit, - }, deps.Stor, deps.LocalStore, deps.Si, nil, wstates) - } if deps.Maddrs == nil { deps.Maddrs = map[dtypes.MinerAddress]bool{} } diff --git a/cmd/curio/ffi.go b/cmd/curio/ffi.go new file mode 100644 index 000000000..5c9411063 --- /dev/null +++ b/cmd/curio/ffi.go @@ -0,0 +1,71 @@ +package main + +import ( + "encoding/gob" + "fmt" + "os" + "reflect" + + "github.com/ipfs/go-cid" + "github.com/samber/lo" + "github.com/urfave/cli/v2" + "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/lib/ffiselect" + ffidirect "github.com/filecoin-project/lotus/lib/ffiselect/ffidirect" + "github.com/filecoin-project/lotus/lib/must" +) + +var ffiCmd = &cli.Command{ + Name: "ffi", + Hidden: true, + Flags: []cli.Flag{ + layersFlag, + }, + Action: func(cctx *cli.Context) (err error) { + output := os.NewFile(uintptr(3), "out") + + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("panic: %v", r) + } + if err != nil { + err = gob.NewEncoder(output).Encode(ffiselect.ValErr{Val: nil, Err: err.Error()}) + if err != nil { + panic(err) + } + } + }() + var callInfo ffiselect.FFICall + if err := gob.NewDecoder(os.Stdin).Decode(&callInfo); err != nil { + return xerrors.Errorf("ffi subprocess can not decode: %w", err) + } + + args := lo.Map(callInfo.Args, func(arg any, i int) reflect.Value { + return reflect.ValueOf(arg) + }) + + resAry := reflect.ValueOf(ffidirect.FFI{}).MethodByName(callInfo.Fn).Call(args) + res := lo.Map(resAry, func(res reflect.Value, i int) any { + return res.Interface() + }) + + err = gob.NewEncoder(output).Encode(ffiselect.ValErr{Val: res, Err: ""}) + if err != nil { + return xerrors.Errorf("ffi subprocess can not encode: %w", err) + } + + return output.Close() + }, +} + +func ffiSelfTest() { + val1, val2 := 12345678, must.One(cid.Parse("bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi")) + ret1, ret2, err := ffiselect.FFISelect{}.SelfTest(val1, val2) + if err != nil { + panic("ffi self test failed:" + err.Error()) + } + if ret1 != val1 || !val2.Equals(ret2) { + panic(fmt.Sprint("ffi self test failed: values do not match: ", val1, val2, ret1, ret2)) + } +} diff --git a/cmd/curio/main.go b/cmd/curio/main.go index 9a092dad0..f6730138b 100644 --- a/cmd/curio/main.go +++ b/cmd/curio/main.go @@ -59,6 +59,7 @@ func main() { sealCmd, marketCmd, fetchParamCmd, + ffiCmd, } jaeger := tracing.SetupJaegerTracing("curio") diff --git a/cmd/curio/proving.go b/cmd/curio/proving.go index 3b5a3e0e4..c5fb78639 100644 --- a/cmd/curio/proving.go +++ b/cmd/curio/proving.go @@ -168,7 +168,7 @@ It will not send any messages to the chain. Since it can compute any deadline, o } wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := curio.WindowPostScheduler( - ctx, deps.Cfg.Fees, deps.Cfg.Proving, deps.Full, deps.Verif, deps.LW, nil, nil, + ctx, deps.Cfg.Fees, deps.Cfg.Proving, deps.Full, deps.Verif, nil, nil, deps.As, deps.Maddrs, deps.DB, deps.Stor, deps.Si, deps.Cfg.Subsystems.WindowPostMaxTasks) if err != nil { return err diff --git a/cmd/curio/run.go b/cmd/curio/run.go index 3d772b14d..c2c763287 100644 --- a/cmd/curio/run.go +++ b/cmd/curio/run.go @@ -9,10 +9,8 @@ import ( "github.com/pkg/errors" "github.com/urfave/cli/v2" "go.opencensus.io/stats" - "go.opencensus.io/tag" "golang.org/x/xerrors" - "github.com/filecoin-project/lotus/build" lcli "github.com/filecoin-project/lotus/cli" "github.com/filecoin-project/lotus/cmd/curio/deps" "github.com/filecoin-project/lotus/cmd/curio/rpc" @@ -94,11 +92,7 @@ var runCmd = &cli.Command{ log.Errorf("ensuring tempdir exists: %s", err) } - ctx, _ := tag.New(lcli.DaemonContext(cctx), - tag.Insert(metrics.Version, build.BuildVersion), - tag.Insert(metrics.Commit, build.CurrentCommit), - tag.Insert(metrics.NodeType, "curio"), - ) + ctx := lcli.DaemonContext(cctx) shutdownChan := make(chan struct{}) { var ctxclose func() @@ -131,6 +125,8 @@ var runCmd = &cli.Command{ return err } + go ffiSelfTest() // Panics on failure + taskEngine, err := tasks.StartTasks(ctx, dependencies) if err != nil { @@ -155,6 +151,11 @@ var runCmd = &cli.Command{ }, } +var layersFlag = &cli.StringSliceFlag{ + Name: "layers", + Usage: "list of layers to be interpreted (atop defaults). Default: base", +} + var webCmd = &cli.Command{ Name: "web", Usage: "Start Curio web interface", @@ -170,10 +171,7 @@ var webCmd = &cli.Command{ Name: "nosync", Usage: "don't check full-node sync status", }, - &cli.StringSliceFlag{ - Name: "layers", - Usage: "list of layers to be interpreted (atop defaults). Default: base", - }, + layersFlag, }, Action: func(cctx *cli.Context) error { diff --git a/cmd/curio/tasks/tasks.go b/cmd/curio/tasks/tasks.go index ce53b6305..0c2745cf1 100644 --- a/cmd/curio/tasks/tasks.go +++ b/cmd/curio/tasks/tasks.go @@ -38,7 +38,6 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task db := dependencies.DB full := dependencies.Full verif := dependencies.Verif - lw := dependencies.LW as := dependencies.As maddrs := dependencies.Maddrs stor := dependencies.Stor @@ -61,7 +60,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task if cfg.Subsystems.EnableWindowPost { wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := curio.WindowPostScheduler( - ctx, cfg.Fees, cfg.Proving, full, verif, lw, sender, chainSched, + ctx, cfg.Fees, cfg.Proving, full, verif, sender, chainSched, as, maddrs, db, stor, si, cfg.Subsystems.WindowPostMaxTasks) if err != nil { @@ -72,7 +71,8 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task } if cfg.Subsystems.EnableWinningPost { - winPoStTask := winning.NewWinPostTask(cfg.Subsystems.WinningPostMaxTasks, db, lw, verif, full, maddrs) + pl := dependencies.LocalStore + winPoStTask := winning.NewWinPostTask(cfg.Subsystems.WinningPostMaxTasks, db, pl, verif, full, maddrs) activeTasks = append(activeTasks, winPoStTask) needProofParams = true } diff --git a/curiosrc/build/build.go b/curiosrc/build/build.go new file mode 100644 index 000000000..0a69f55c1 --- /dev/null +++ b/curiosrc/build/build.go @@ -0,0 +1,9 @@ +package build + +// IsOpencl is set to the value of FFI_USE_OPENCL +var IsOpencl string + +// Format: 8 HEX then underscore then ISO8701 date +// Ex: 4c5e98f28_2024-05-17T18:42:27-04:00 +// NOTE: git date for repeatabile builds. +var Commit string diff --git a/curiosrc/builder.go b/curiosrc/builder.go index 3cd4bd0cd..821974935 100644 --- a/curiosrc/builder.go +++ b/curiosrc/builder.go @@ -13,21 +13,20 @@ import ( "github.com/filecoin-project/lotus/node/config" dtypes "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/storage/paths" - "github.com/filecoin-project/lotus/storage/sealer" "github.com/filecoin-project/lotus/storage/sealer/storiface" ) //var log = logging.Logger("provider") func WindowPostScheduler(ctx context.Context, fc config.CurioFees, pc config.CurioProvingConfig, - api api.FullNode, verif storiface.Verifier, lw *sealer.LocalWorker, sender *message.Sender, chainSched *chainsched.CurioChainSched, + api api.FullNode, verif storiface.Verifier, sender *message.Sender, chainSched *chainsched.CurioChainSched, as *multictladdr.MultiAddressSelector, addresses map[dtypes.MinerAddress]bool, db *harmonydb.DB, stor paths.Store, idx paths.SectorIndex, max int) (*window.WdPostTask, *window.WdPostSubmitTask, *window.WdPostRecoverDeclareTask, error) { // todo config ft := window.NewSimpleFaultTracker(stor, idx, pc.ParallelCheckLimit, time.Duration(pc.SingleCheckTimeout), time.Duration(pc.PartitionCheckTimeout)) - computeTask, err := window.NewWdPostTask(db, api, ft, lw, verif, chainSched, addresses, max) + computeTask, err := window.NewWdPostTask(db, api, ft, stor, verif, chainSched, addresses, max, pc.ParallelCheckLimit, time.Duration(pc.SingleCheckTimeout)) if err != nil { return nil, nil, nil, err } diff --git a/curiosrc/ffi/sdr_funcs.go b/curiosrc/ffi/sdr_funcs.go index fe15a0b66..9cd8763b4 100644 --- a/curiosrc/ffi/sdr_funcs.go +++ b/curiosrc/ffi/sdr_funcs.go @@ -14,12 +14,14 @@ import ( "github.com/puzpuzpuz/xsync/v2" "golang.org/x/xerrors" + // TODO everywhere here that we call this we should call our proxy instead. ffi "github.com/filecoin-project/filecoin-ffi" commcid "github.com/filecoin-project/go-fil-commcid" "github.com/filecoin-project/go-state-types/abi" proof2 "github.com/filecoin-project/go-state-types/proof" "github.com/filecoin-project/lotus/curiosrc/proof" + "github.com/filecoin-project/lotus/lib/ffiselect" "github.com/filecoin-project/lotus/lib/harmony/harmonytask" "github.com/filecoin-project/lotus/storage/paths" "github.com/filecoin-project/lotus/storage/sealer/proofpaths" @@ -256,7 +258,7 @@ func (sb *SealCalls) TreeRC(ctx context.Context, task *harmonytask.TaskID, secto } } - sl, uns, err := ffi.SealPreCommitPhase2(p1o, fspaths.Cache, fspaths.Sealed) + sl, uns, err := ffiselect.FFISelect{}.SealPreCommitPhase2(sector.ID, p1o, fspaths.Cache, fspaths.Sealed) if err != nil { return cid.Undef, cid.Undef, xerrors.Errorf("computing seal proof: %w", err) } @@ -307,7 +309,7 @@ func (sb *SealCalls) PoRepSnark(ctx context.Context, sn storiface.SectorRef, sea return nil, xerrors.Errorf("failed to generate vanilla proof: %w", err) } - proof, err := ffi.SealCommitPhase2(vproof, sn.ID.Number, sn.ID.Miner) + proof, err := ffiselect.FFISelect{}.SealCommitPhase2(vproof, sn.ID.Number, sn.ID.Miner) if err != nil { return nil, xerrors.Errorf("computing seal proof failed: %w", err) } diff --git a/curiosrc/window/compute_do.go b/curiosrc/window/compute_do.go index fcde14d82..062ccfa30 100644 --- a/curiosrc/window/compute_do.go +++ b/curiosrc/window/compute_do.go @@ -25,6 +25,7 @@ import ( "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/ffiselect" "github.com/filecoin-project/lotus/storage/sealer" "github.com/filecoin-project/lotus/storage/sealer/storiface" ) @@ -408,7 +409,7 @@ func (t *WdPostTask) generateWindowPoSt(ctx context.Context, ppt abi.RegisteredP }) } - pr, err := t.prover.GenerateWindowPoStAdv(cctx, ppt, minerID, sectors, int(partIdx), randomness, true) + pr, err := t.GenerateWindowPoStAdv(cctx, ppt, minerID, sectors, int(partIdx), randomness, true) sk := pr.Skipped if err != nil || len(sk) > 0 { @@ -440,3 +441,107 @@ func (t *WdPostTask) generateWindowPoSt(ctx context.Context, ppt abi.RegisteredP out = append(out, *postProofs) return out, skipped, retErr } + +func (t *WdPostTask) GenerateWindowPoStAdv(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []storiface.PostSectorChallenge, partitionIdx int, randomness abi.PoStRandomness, allowSkip bool) (storiface.WindowPoStResult, error) { + + var slk sync.Mutex + var skipped []abi.SectorID + + var wg sync.WaitGroup + wg.Add(len(sectors)) + + vproofs := make([][]byte, len(sectors)) + + for i, s := range sectors { + if t.parallel != nil { + select { + case t.parallel <- struct{}{}: + case <-ctx.Done(): + return storiface.WindowPoStResult{}, xerrors.Errorf("context error waiting on challengeThrottle") + } + } + + go func(i int, s storiface.PostSectorChallenge) { + defer wg.Done() + if t.parallel != nil { + defer func() { + <-t.parallel + }() + } + + if t.challengeReadTimeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, t.challengeReadTimeout) + defer cancel() + } + + vanilla, err := t.storage.GenerateSingleVanillaProof(ctx, mid, s, ppt) + slk.Lock() + defer slk.Unlock() + + if err != nil || vanilla == nil { + skipped = append(skipped, abi.SectorID{ + Miner: mid, + Number: s.SectorNumber, + }) + log.Errorf("reading PoSt challenge for sector %d, vlen:%d, err: %s", s.SectorNumber, len(vanilla), err) + return + } + + vproofs[i] = vanilla + }(i, s) + } + wg.Wait() + + if len(skipped) > 0 && !allowSkip { + // This should happen rarely because before entering GenerateWindowPoSt we check all sectors by reading challenges. + // When it does happen, window post runner logic will just re-check sectors, and retry with newly-discovered-bad sectors skipped + log.Errorf("couldn't read some challenges (skipped %d)", len(skipped)) + + // note: can't return an error as this in an jsonrpc call + return storiface.WindowPoStResult{Skipped: skipped}, nil + } + + // compact skipped sectors + var skippedSoFar int + for i := range vproofs { + if len(vproofs[i]) == 0 { + skippedSoFar++ + continue + } + + if skippedSoFar > 0 { + vproofs[i-skippedSoFar] = vproofs[i] + } + } + + vproofs = vproofs[:len(vproofs)-skippedSoFar] + + // compute the PoSt! + res, err := t.GenerateWindowPoStWithVanilla(ctx, ppt, mid, randomness, vproofs, partitionIdx) + r := storiface.WindowPoStResult{ + PoStProofs: res, + Skipped: skipped, + } + if err != nil { + log.Errorw("generating window PoSt failed", "error", err) + return r, xerrors.Errorf("generate window PoSt with vanilla proofs: %w", err) + } + return r, nil +} + +func (t *WdPostTask) GenerateWindowPoStWithVanilla(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, proofs [][]byte, partitionIdx int) (proof.PoStProof, error) { + pp, err := ffiselect.FFISelect{}.GenerateSinglePartitionWindowPoStWithVanilla(proofType, minerID, randomness, proofs, uint(partitionIdx)) + if err != nil { + return proof.PoStProof{}, err + } + if pp == nil { + // should be impossible, but just in case do not panic + return proof.PoStProof{}, xerrors.New("postproof was nil") + } + + return proof.PoStProof{ + PoStProof: pp.PoStProof, + ProofBytes: pp.ProofBytes, + }, nil +} diff --git a/curiosrc/window/compute_task.go b/curiosrc/window/compute_task.go index 541a2d5e2..b3f4974a6 100644 --- a/curiosrc/window/compute_task.go +++ b/curiosrc/window/compute_task.go @@ -7,6 +7,7 @@ import ( "fmt" "sort" "strings" + "time" logging "github.com/ipfs/go-log/v2" "github.com/samber/lo" @@ -29,6 +30,7 @@ import ( "github.com/filecoin-project/lotus/lib/harmony/taskhelp" "github.com/filecoin-project/lotus/lib/promise" "github.com/filecoin-project/lotus/node/modules/dtypes" + "github.com/filecoin-project/lotus/storage/paths" "github.com/filecoin-project/lotus/storage/sealer" "github.com/filecoin-project/lotus/storage/sealer/sealtasks" "github.com/filecoin-project/lotus/storage/sealer/storiface" @@ -65,13 +67,15 @@ type WdPostTask struct { db *harmonydb.DB faultTracker sealer.FaultTracker - prover ProverPoSt + storage paths.Store verifier storiface.Verifier windowPoStTF promise.Promise[harmonytask.AddTaskFunc] - actors map[dtypes.MinerAddress]bool - max int + actors map[dtypes.MinerAddress]bool + max int + parallel chan struct{} + challengeReadTimeout time.Duration } type wdTaskIdentity struct { @@ -84,22 +88,28 @@ type wdTaskIdentity struct { func NewWdPostTask(db *harmonydb.DB, api WDPoStAPI, faultTracker sealer.FaultTracker, - prover ProverPoSt, + storage paths.Store, verifier storiface.Verifier, pcs *chainsched.CurioChainSched, actors map[dtypes.MinerAddress]bool, max int, + parallel int, + challengeReadTimeout time.Duration, ) (*WdPostTask, error) { t := &WdPostTask{ db: db, api: api, faultTracker: faultTracker, - prover: prover, + storage: storage, verifier: verifier, - actors: actors, - max: max, + actors: actors, + max: max, + challengeReadTimeout: challengeReadTimeout, + } + if parallel > 0 { + t.parallel = make(chan struct{}, parallel) } if pcs != nil { diff --git a/curiosrc/winning/winning_task.go b/curiosrc/winning/winning_task.go index 920a73394..5fc402821 100644 --- a/curiosrc/winning/winning_task.go +++ b/curiosrc/winning/winning_task.go @@ -10,6 +10,7 @@ import ( "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" + "golang.org/x/sync/errgroup" "golang.org/x/xerrors" ffi "github.com/filecoin-project/filecoin-ffi" @@ -17,6 +18,7 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/network" + "github.com/filecoin-project/go-state-types/proof" prooftypes "github.com/filecoin-project/go-state-types/proof" "github.com/filecoin-project/lotus/api" @@ -24,11 +26,13 @@ import ( "github.com/filecoin-project/lotus/chain/gen" lrand "github.com/filecoin-project/lotus/chain/rand" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/ffiselect" "github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/lib/harmony/harmonytask" "github.com/filecoin-project/lotus/lib/harmony/resources" "github.com/filecoin-project/lotus/lib/promise" "github.com/filecoin-project/lotus/node/modules/dtypes" + "github.com/filecoin-project/lotus/storage/paths" "github.com/filecoin-project/lotus/storage/sealer/storiface" ) @@ -38,7 +42,7 @@ type WinPostTask struct { max int db *harmonydb.DB - prover ProverWinningPoSt + paths *paths.Local verifier storiface.Verifier api WinPostAPI @@ -66,15 +70,11 @@ type WinPostAPI interface { WalletSign(context.Context, address.Address, []byte) (*crypto.Signature, error) } -type ProverWinningPoSt interface { - GenerateWinningPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, minerID abi.ActorID, sectorInfo []storiface.PostSectorChallenge, randomness abi.PoStRandomness) ([]prooftypes.PoStProof, error) -} - -func NewWinPostTask(max int, db *harmonydb.DB, prover ProverWinningPoSt, verifier storiface.Verifier, api WinPostAPI, actors map[dtypes.MinerAddress]bool) *WinPostTask { +func NewWinPostTask(max int, db *harmonydb.DB, pl *paths.Local, verifier storiface.Verifier, api WinPostAPI, actors map[dtypes.MinerAddress]bool) *WinPostTask { t := &WinPostTask{ max: max, db: db, - prover: prover, + paths: pl, verifier: verifier, api: api, actors: actors, @@ -272,7 +272,8 @@ func (t *WinPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (don } } - wpostProof, err = t.prover.GenerateWinningPoSt(ctx, ppt, abi.ActorID(details.SpID), sectorChallenges, prand) + _, err = t.generateWinningPost(ctx, ppt, abi.ActorID(details.SpID), sectorChallenges, prand) + //wpostProof, err = t.prover.GenerateWinningPoSt(ctx, ppt, abi.ActorID(details.SpID), sectorChallenges, prand) if err != nil { err = xerrors.Errorf("failed to compute winning post proof: %w", err) return false, err @@ -433,6 +434,42 @@ func (t *WinPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (don return true, nil } +func (t *WinPostTask) generateWinningPost( + ctx context.Context, + ppt abi.RegisteredPoStProof, + mid abi.ActorID, + sectors []storiface.PostSectorChallenge, + randomness abi.PoStRandomness) ([]proof.PoStProof, error) { + + // don't throttle winningPoSt + // * Always want it done asap + // * It's usually just one sector + + vproofs := make([][]byte, len(sectors)) + eg := errgroup.Group{} + + for i, s := range sectors { + i, s := i, s + eg.Go(func() error { + vanilla, err := t.paths.GenerateSingleVanillaProof(ctx, mid, s, ppt) + if err != nil { + return xerrors.Errorf("get winning sector:%d,vanila failed: %w", s.SectorNumber, err) + } + if vanilla == nil { + return xerrors.Errorf("get winning sector:%d,vanila is nil", s.SectorNumber) + } + vproofs[i] = vanilla + return nil + }) + } + if err := eg.Wait(); err != nil { + return nil, err + } + + return ffiselect.FFISelect{}.GenerateWinningPoStWithVanilla(ppt, mid, randomness, vproofs) + +} + func (t *WinPostTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { if len(ids) == 0 { // probably can't happen, but panicking is bad diff --git a/lib/ffiselect/ffidirect/ffi-direct.go b/lib/ffiselect/ffidirect/ffi-direct.go new file mode 100644 index 000000000..23d6d28b5 --- /dev/null +++ b/lib/ffiselect/ffidirect/ffi-direct.go @@ -0,0 +1,71 @@ +// This is a wrapper around the FFI functions that allows them to be called by reflection. +// For the Curio GPU selector, see lib/ffiselect/ffiselect.go. +package ffidirect + +import ( + "github.com/ipfs/go-cid" + + ffi "github.com/filecoin-project/filecoin-ffi" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/proof" +) + +// This allow reflection access to the FFI functions. +type FFI struct{} + +type ErrorString = string + +func untypeError1[R any](r R, err error) (R, string) { + if err == nil { + return r, "" + } + + return r, err.Error() +} + +func untypeError2[R1, R2 any](r1 R1, r2 R2, err error) (R1, R2, string) { + if err == nil { + return r1, r2, "" + } + + return r1, r2, err.Error() +} + +func (FFI) GenerateSinglePartitionWindowPoStWithVanilla( + proofType abi.RegisteredPoStProof, + minerID abi.ActorID, + randomness abi.PoStRandomness, + proofs [][]byte, + partitionIndex uint, +) (*ffi.PartitionProof, ErrorString) { + return untypeError1(ffi.GenerateSinglePartitionWindowPoStWithVanilla(proofType, minerID, randomness, proofs, partitionIndex)) +} + +func (FFI) SealPreCommitPhase2( + phase1Output []byte, + cacheDirPath string, + sealedSectorPath string, +) (sealedCID cid.Cid, unsealedCID cid.Cid, err ErrorString) { + return untypeError2(ffi.SealPreCommitPhase2(phase1Output, cacheDirPath, sealedSectorPath)) +} + +func (FFI) SealCommitPhase2( + phase1Output []byte, + sectorNum abi.SectorNumber, + minerID abi.ActorID, +) ([]byte, ErrorString) { + return untypeError1(ffi.SealCommitPhase2(phase1Output, sectorNum, minerID)) +} + +func (FFI) GenerateWinningPoStWithVanilla( + proofType abi.RegisteredPoStProof, + minerID abi.ActorID, + randomness abi.PoStRandomness, + proofs [][]byte, +) ([]proof.PoStProof, ErrorString) { + return untypeError1(ffi.GenerateWinningPoStWithVanilla(proofType, minerID, randomness, proofs)) +} + +func (FFI) SelfTest(val1 int, val2 cid.Cid) (int, cid.Cid, ErrorString) { + return untypeError2(val1, val2, nil) +} diff --git a/lib/ffiselect/ffiselect.go b/lib/ffiselect/ffiselect.go new file mode 100644 index 000000000..a299d2dc0 --- /dev/null +++ b/lib/ffiselect/ffiselect.go @@ -0,0 +1,257 @@ +package ffiselect + +import ( + "bytes" + "encoding/gob" + "io" + "os" + "os/exec" + "reflect" + "strconv" + "strings" + + "github.com/ipfs/go-cid" + "github.com/samber/lo" + "golang.org/x/xerrors" + + ffi "github.com/filecoin-project/filecoin-ffi" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/proof" + + "github.com/filecoin-project/lotus/curiosrc/build" + "github.com/filecoin-project/lotus/lib/ffiselect/ffidirect" +) + +var IsCuda = build.IsOpencl != "1" + +// Get all devices from ffi +var ch chan string + +func init() { + devices, err := ffi.GetGPUDevices() + if err != nil { + panic(err) + } + ch = make(chan string, len(devices)) + for i := 0; i < len(devices); i++ { + ch <- strconv.Itoa(i) + } +} + +type ValErr struct { + Val []interface{} + Err string +} + +// This is not the one you're looking for. +type FFICall struct { + Fn string + Args []interface{} +} + +func subStrInSet(set []string, sub string) bool { + return lo.Reduce(set, func(agg bool, item string, _ int) bool { return agg || strings.Contains(item, sub) }, false) +} + +func call(logctx []any, fn string, args ...interface{}) ([]interface{}, error) { + // get dOrdinal + dOrdinal := <-ch + defer func() { + ch <- dOrdinal + }() + + p, err := os.Executable() + if err != nil { + return nil, err + } + + commandAry := []string{"ffi"} + cmd := exec.Command(p, commandAry...) + + // Set Visible Devices for CUDA and OpenCL + cmd.Env = append(os.Environ(), + func(isCuda bool) string { + if isCuda { + return "CUDA_VISIBLE_DEVICES=" + dOrdinal + } + return "GPU_DEVICE_ORDINAL=" + dOrdinal + }(IsCuda)) + tmpDir, err := os.MkdirTemp("", "rust-fil-proofs") + if err != nil { + return nil, err + } + cmd.Env = append(cmd.Env, "TMPDIR="+tmpDir) + + if !subStrInSet(cmd.Env, "RUST_LOG") { + cmd.Env = append(cmd.Env, "RUST_LOG=debug") + } + if !subStrInSet(cmd.Env, "FIL_PROOFS_USE_GPU_COLUMN_BUILDER") { + cmd.Env = append(cmd.Env, "FIL_PROOFS_USE_GPU_COLUMN_BUILDER=1") + } + if !subStrInSet(cmd.Env, "FIL_PROOFS_USE_GPU_TREE_BUILDER") { + cmd.Env = append(cmd.Env, "FIL_PROOFS_USE_GPU_TREE_BUILDER=1") + } + + defer func() { _ = os.RemoveAll(tmpDir) }() + + lw := NewLogWriter(logctx, os.Stderr) + + cmd.Stderr = lw + cmd.Stdout = os.Stdout + outFile, err := os.CreateTemp("", "out") + if err != nil { + return nil, err + } + cmd.ExtraFiles = []*os.File{outFile} + var encArgs bytes.Buffer + err = gob.NewEncoder(&encArgs).Encode(FFICall{ + Fn: fn, + Args: args, + }) + if err != nil { + return nil, xerrors.Errorf("subprocess caller cannot encode: %w", err) + } + + cmd.Stdin = &encArgs + err = cmd.Run() + if err != nil { + return nil, err + } + + // seek to start + if _, err := outFile.Seek(0, io.SeekStart); err != nil { + return nil, xerrors.Errorf("failed to seek to beginning of output file: %w", err) + } + + var ve ValErr + err = gob.NewDecoder(outFile).Decode(&ve) + if err != nil { + return nil, xerrors.Errorf("subprocess caller cannot decode: %w", err) + } + if ve.Err != "" { + return nil, xerrors.Errorf("subprocess failure: %s", ve.Err) + } + if ve.Val[len(ve.Val)-1].(ffidirect.ErrorString) != "" { + return nil, xerrors.Errorf("subprocess call error: %s", ve.Val[len(ve.Val)-1].(ffidirect.ErrorString)) + } + return ve.Val, nil +} + +///////////Funcs reachable by the GPU selector./////////// +// NOTE: Changes here MUST also change ffi-direct.go + +type FFISelect struct{} + +func (FFISelect) GenerateSinglePartitionWindowPoStWithVanilla( + proofType abi.RegisteredPoStProof, + minerID abi.ActorID, + randomness abi.PoStRandomness, + proofs [][]byte, + partitionIndex uint, +) (*ffi.PartitionProof, error) { + logctx := []any{"spid", minerID, "proof_count", len(proofs), "partition_index", partitionIndex} + + val, err := call(logctx, "GenerateSinglePartitionWindowPoStWithVanilla", proofType, minerID, randomness, proofs, partitionIndex) + if err != nil { + return nil, err + } + return val[0].(*ffi.PartitionProof), nil +} +func (FFISelect) SealPreCommitPhase2( + sid abi.SectorID, + phase1Output []byte, + cacheDirPath string, + sealedSectorPath string, +) (sealedCID cid.Cid, unsealedCID cid.Cid, err error) { + logctx := []any{"sector", sid} + + val, err := call(logctx, "SealPreCommitPhase2", phase1Output, cacheDirPath, sealedSectorPath) + if err != nil { + return cid.Undef, cid.Undef, err + } + return val[0].(cid.Cid), val[1].(cid.Cid), nil +} + +func (FFISelect) SealCommitPhase2( + phase1Output []byte, + sectorNum abi.SectorNumber, + minerID abi.ActorID, +) ([]byte, error) { + logctx := []any{"sector", abi.SectorID{Miner: minerID, Number: sectorNum}} + + val, err := call(logctx, "SealCommitPhase2", phase1Output, sectorNum, minerID) + if err != nil { + return nil, err + } + + return val[0].([]byte), nil +} + +func (FFISelect) GenerateWinningPoStWithVanilla( + proofType abi.RegisteredPoStProof, + minerID abi.ActorID, + randomness abi.PoStRandomness, + proofs [][]byte, +) ([]proof.PoStProof, error) { + logctx := []any{"proof_type", proofType, "miner_id", minerID} + + val, err := call(logctx, "GenerateWinningPoStWithVanilla", proofType, minerID, randomness, proofs) + if err != nil { + return nil, err + } + return val[0].([]proof.PoStProof), nil +} + +func (FFISelect) SelfTest(val1 int, val2 cid.Cid) (int, cid.Cid, error) { + val, err := call([]any{"selftest", "true"}, "SelfTest", val1, val2) + if err != nil { + return 0, cid.Undef, err + } + return val[0].(int), val[1].(cid.Cid), nil +} + +// ////////////////////////// + +func init() { + registeredTypes := []any{ + ValErr{}, + FFICall{}, + cid.Cid{}, + abi.RegisteredPoStProof(0), + abi.ActorID(0), + abi.PoStRandomness{}, + abi.SectorNumber(0), + ffi.PartitionProof{}, + proof.PoStProof{}, + abi.RegisteredPoStProof(0), + } + var registeredTypeNames = make(map[string]struct{}) + + //Ensure all methods are implemented: + // This is designed to fail for happy-path runs + // and should never actually impact curio users. + for _, t := range registeredTypes { + gob.Register(t) + registeredTypeNames[reflect.TypeOf(t).PkgPath()+"."+reflect.TypeOf(t).Name()] = struct{}{} + } + + to := reflect.TypeOf(ffidirect.FFI{}) + for m := 0; m < to.NumMethod(); m++ { + tm := to.Method(m) + tf := tm.Func + for i := 1; i < tf.Type().NumIn(); i++ { // skipping first arg (struct type) + in := tf.Type().In(i) + nm := in.PkgPath() + "." + in.Name() + if _, ok := registeredTypeNames[nm]; in.PkgPath() != "" && !ok { // built-ins ok + panic("ffiSelect: unregistered type: " + nm + " from " + tm.Name + " arg: " + strconv.Itoa(i)) + } + } + for i := 0; i < tf.Type().NumOut(); i++ { + out := tf.Type().Out(i) + nm := out.PkgPath() + "." + out.Name() + if _, ok := registeredTypeNames[nm]; out.PkgPath() != "" && !ok { // built-ins ok + panic("ffiSelect: unregistered type: " + nm + " from " + tm.Name + " arg: " + strconv.Itoa(i)) + } + } + } +} diff --git a/lib/ffiselect/logparse.go b/lib/ffiselect/logparse.go new file mode 100644 index 000000000..3508a1f89 --- /dev/null +++ b/lib/ffiselect/logparse.go @@ -0,0 +1,88 @@ +package ffiselect + +import ( + "bufio" + "bytes" + "io" + "regexp" + "strings" + "time" + + logging "github.com/ipfs/go-log/v2" + "go.uber.org/zap" +) + +var log = logging.Logger("ffiselect") + +type LogWriter struct { + ctx []any + errOut io.Writer + re *regexp.Regexp +} + +func NewLogWriter(logctx []any, errOut io.Writer) *LogWriter { + re := regexp.MustCompile(`^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3})\s+(\w+)\s+(.*)$`) + return &LogWriter{ + ctx: logctx, + errOut: errOut, + re: re, + } +} + +func (lw *LogWriter) Write(p []byte) (n int, err error) { + reader := bufio.NewReader(bytes.NewReader(p)) + for { + line, err := reader.ReadBytes('\n') + if err == io.EOF { + break + } + if err != nil { + return 0, err + } + + lineStr := string(line) + // trim trailing \n + lineStr = strings.TrimSpace(lineStr) + + matches := lw.re.FindStringSubmatch(lineStr) + if matches == nil { + // Line didn't match the expected format, write it to stderr as-is + _, err := lw.errOut.Write(line) + if err != nil { + return 0, err + } + continue + } + + timestamp, logLevel, message := matches[1], matches[2], matches[3] + logTime, err := time.Parse("2006-01-02T15:04:05.000", timestamp) + if err != nil { + _, err := lw.errOut.Write(line) + if err != nil { + return 0, err + } + continue + } + + var zapLevel zap.AtomicLevel + switch logLevel { + case "DEBUG": + zapLevel = zap.NewAtomicLevelAt(zap.DebugLevel) + case "INFO": + zapLevel = zap.NewAtomicLevelAt(zap.InfoLevel) + case "WARN": + zapLevel = zap.NewAtomicLevelAt(zap.WarnLevel) + case "ERROR": + zapLevel = zap.NewAtomicLevelAt(zap.ErrorLevel) + default: + _, err := lw.errOut.Write(line) + if err != nil { + return 0, err + } + continue + } + + log.With(zap.Time("timestamp", logTime)).Logw(zapLevel.Level(), message, lw.ctx...) + } + return len(p), nil +} diff --git a/storage/sealer/ffiwrapper/sealer_cgo.go b/storage/sealer/ffiwrapper/sealer_cgo.go index f16130331..80f06ad0c 100644 --- a/storage/sealer/ffiwrapper/sealer_cgo.go +++ b/storage/sealer/ffiwrapper/sealer_cgo.go @@ -1421,6 +1421,13 @@ func GenerateUnsealedCID(proofType abi.RegisteredSealProof, pieces []abi.PieceIn return ffi.GenerateUnsealedCID(proofType, allPieces) } +func (sb *Sealer) GenerateSingleVanillaProof( + replica ffi.PrivateSectorInfo, + challenges []uint64, +) ([]byte, error) { + return ffi.GenerateSingleVanillaProof(replica, challenges) +} + func (sb *Sealer) GenerateWinningPoStWithVanilla(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, vanillas [][]byte) ([]proof.PoStProof, error) { return ffi.GenerateWinningPoStWithVanilla(proofType, minerID, randomness, vanillas) }