diff --git a/cmd/lotus-bench/simple.go b/cmd/lotus-bench/simple.go index 09df22078..35d909ffb 100644 --- a/cmd/lotus-bench/simple.go +++ b/cmd/lotus-bench/simple.go @@ -308,7 +308,36 @@ var simplePreCommit2 = &cli.Command{ Name: "synthetic", Usage: "generate synthetic PoRep proofs", }, + &cli.StringFlag{ + Name: "external-pc2", + Usage: "command for computing PC2 externally", + }, }, + Description: `Compute PreCommit2 inputs and seal a sector. + +--external-pc2 can be used to compute the PreCommit2 inputs externally. +The flag behaves similarly to the related lotus-worker flag, using it in +lotus-bench may be useful for testing if the external PreCommit2 command is +invoked correctly. + +The command will be called with a number of environment variables set: +* EXTSEAL_PC2_SECTOR_NUM: the sector number +* EXTSEAL_PC2_SECTOR_MINER: the miner id +* EXTSEAL_PC2_PROOF_TYPE: the proof type +* EXTSEAL_PC2_SECTOR_SIZE: the sector size in bytes +* EXTSEAL_PC2_CACHE: the path to the cache directory +* EXTSEAL_PC2_SEALED: the path to the sealed sector file (initialized with unsealed data by the caller) +* EXTSEAL_PC2_PC1OUT: output from rust-fil-proofs precommit1 phase (base64 encoded json) + +The command is expected to: +* Create cache sc-02-data-tree-r* files +* Create cache sc-02-data-tree-c* files +* Create cache p_aux / t_aux files +* Transform the sealed file in place + +Example invocation of lotus-bench as external executor: +'./lotus-bench simple precommit2 --sector-size $EXTSEAL_PC2_SECTOR_SIZE $EXTSEAL_PC2_SEALED $EXTSEAL_PC2_CACHE $EXTSEAL_PC2_PC1OUT' +`, ArgsUsage: "[sealed] [cache] [pc1 out]", Action: func(cctx *cli.Context) error { ctx := cctx.Context @@ -333,7 +362,18 @@ var simplePreCommit2 = &cli.Command{ storiface.FTSealed: cctx.Args().Get(0), storiface.FTCache: cctx.Args().Get(1), } - sealer, err := ffiwrapper.New(pp) + + var opts []ffiwrapper.FFIWrapperOpt + + if cctx.IsSet("external-pc2") { + extSeal := ffiwrapper.ExternalSealer{ + PreCommit2: ffiwrapper.MakeExternPrecommit2(cctx.String("external-pc2")), + } + + opts = append(opts, ffiwrapper.WithExternalSealCalls(extSeal)) + } + + sealer, err := ffiwrapper.New(pp, opts...) if err != nil { return err } diff --git a/cmd/lotus-worker/main.go b/cmd/lotus-worker/main.go index 257dac800..41af11bdd 100644 --- a/cmd/lotus-worker/main.go +++ b/cmd/lotus-worker/main.go @@ -39,6 +39,7 @@ import ( "github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/storage/paths" "github.com/filecoin-project/lotus/storage/sealer" + "github.com/filecoin-project/lotus/storage/sealer/ffiwrapper" "github.com/filecoin-project/lotus/storage/sealer/sealtasks" "github.com/filecoin-project/lotus/storage/sealer/storiface" ) @@ -284,7 +285,36 @@ var runCmd = &cli.Command{ Value: true, DefaultText: "inherits --addpiece", }, + &cli.StringFlag{ + Name: "external-pc2", + Usage: "command for computing PC2 externally", + }, }, + Description: `Run lotus-worker. + +--external-pc2 can be used to compute the PreCommit2 inputs externally. +The flag behaves similarly to the related lotus-worker flag, using it in +lotus-bench may be useful for testing if the external PreCommit2 command is +invoked correctly. + +The command will be called with a number of environment variables set: +* EXTSEAL_PC2_SECTOR_NUM: the sector number +* EXTSEAL_PC2_SECTOR_MINER: the miner id +* EXTSEAL_PC2_PROOF_TYPE: the proof type +* EXTSEAL_PC2_SECTOR_SIZE: the sector size in bytes +* EXTSEAL_PC2_CACHE: the path to the cache directory +* EXTSEAL_PC2_SEALED: the path to the sealed sector file (initialized with unsealed data by the caller) +* EXTSEAL_PC2_PC1OUT: output from rust-fil-proofs precommit1 phase (base64 encoded json) + +The command is expected to: +* Create cache sc-02-data-tree-r* files +* Create cache sc-02-data-tree-c* files +* Create cache p_aux / t_aux files +* Transform the sealed file in place + +Example invocation of lotus-bench as external executor: +'./lotus-bench simple precommit2 --sector-size $EXTSEAL_PC2_SECTOR_SIZE $EXTSEAL_PC2_SEALED $EXTSEAL_PC2_CACHE $EXTSEAL_PC2_PC1OUT' +`, Before: func(cctx *cli.Context) error { if cctx.IsSet("address") { log.Warnf("The '--address' flag is deprecated, it has been replaced by '--listen'") @@ -623,18 +653,32 @@ var runCmd = &cli.Command{ fh.ServeHTTP(w, r) } + // Parse ffi executor flags + + var ffiOpts []ffiwrapper.FFIWrapperOpt + + if cctx.IsSet("external-pc2") { + extSeal := ffiwrapper.ExternalSealer{ + PreCommit2: ffiwrapper.MakeExternPrecommit2(cctx.String("external-pc2")), + } + + ffiOpts = append(ffiOpts, ffiwrapper.WithExternalSealCalls(extSeal)) + } + // Create / expose the worker wsts := statestore.New(namespace.Wrap(ds, modules.WorkerCallsPrefix)) workerApi := &sealworker.Worker{ - LocalWorker: sealer.NewLocalWorker(sealer.WorkerConfig{ - TaskTypes: taskTypes, - NoSwap: cctx.Bool("no-swap"), - MaxParallelChallengeReads: cctx.Int("post-parallel-reads"), - ChallengeReadTimeout: cctx.Duration("post-read-timeout"), - Name: cctx.String("name"), - }, remote, localStore, nodeApi, nodeApi, wsts), + LocalWorker: sealer.NewLocalWorkerWithExecutor( + sealer.FFIExec(ffiOpts...), + sealer.WorkerConfig{ + TaskTypes: taskTypes, + NoSwap: cctx.Bool("no-swap"), + MaxParallelChallengeReads: cctx.Int("post-parallel-reads"), + ChallengeReadTimeout: cctx.Duration("post-read-timeout"), + Name: cctx.String("name"), + }, os.LookupEnv, remote, localStore, nodeApi, nodeApi, wsts), LocalStore: localStore, Storage: lr, } diff --git a/documentation/en/cli-lotus-worker.md b/documentation/en/cli-lotus-worker.md index b01b721eb..cf0b9fd36 100644 --- a/documentation/en/cli-lotus-worker.md +++ b/documentation/en/cli-lotus-worker.md @@ -34,6 +34,33 @@ NAME: USAGE: lotus-worker run [command options] [arguments...] +DESCRIPTION: + Run lotus-worker. + + --external-pc2 can be used to compute the PreCommit2 inputs externally. + The flag behaves similarly to the related lotus-worker flag, using it in + lotus-bench may be useful for testing if the external PreCommit2 command is + invoked correctly. + + The command will be called with a number of environment variables set: + * EXTSEAL_PC2_SECTOR_NUM: the sector number + * EXTSEAL_PC2_SECTOR_MINER: the miner id + * EXTSEAL_PC2_PROOF_TYPE: the proof type + * EXTSEAL_PC2_SECTOR_SIZE: the sector size in bytes + * EXTSEAL_PC2_CACHE: the path to the cache directory + * EXTSEAL_PC2_SEALED: the path to the sealed sector file (initialized with unsealed data by the caller) + * EXTSEAL_PC2_PC1OUT: output from rust-fil-proofs precommit1 phase (base64 encoded json) + + The command is expected to: + * Create cache sc-02-data-tree-r* files + * Create cache sc-02-data-tree-c* files + * Create cache p_aux / t_aux files + * Transform the sealed file in place + + Example invocation of lotus-bench as external executor: + './lotus-bench simple precommit2 --sector-size $EXTSEAL_PC2_SECTOR_SIZE $EXTSEAL_PC2_SEALED $EXTSEAL_PC2_CACHE $EXTSEAL_PC2_PC1OUT' + + OPTIONS: --listen value host address and port the worker api will listen on (default: "0.0.0.0:3456") [$LOTUS_WORKER_LISTEN] --no-local-storage don't use storageminer repo for sector storage (default: false) [$LOTUS_WORKER_NO_LOCAL_STORAGE] @@ -57,6 +84,7 @@ OPTIONS: --timeout value used when 'listen' is unspecified. must be a valid duration recognized by golang's time.ParseDuration function (default: "30m") [$LOTUS_WORKER_TIMEOUT] --http-server-timeout value (default: "30s") --data-cid Run the data-cid task. true|false (default: inherits --addpiece) + --external-pc2 value command for computing PC2 externally --help, -h show help ``` diff --git a/go.mod b/go.mod index 697fcc712..a67dd8d4e 100644 --- a/go.mod +++ b/go.mod @@ -144,6 +144,7 @@ require ( github.com/samber/lo v1.38.1 github.com/stretchr/testify v1.8.4 github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 + github.com/triplewz/poseidon v0.0.0-20220525065023-a7cdb0e183e7 github.com/urfave/cli/v2 v2.25.5 github.com/whyrusleeping/bencher v0.0.0-20190829221104-bb6607aa8bba github.com/whyrusleeping/cbor-gen v0.0.0-20230923211252-36a87e1ba72f @@ -362,3 +363,5 @@ require ( replace github.com/filecoin-project/filecoin-ffi => ./extern/filecoin-ffi replace github.com/filecoin-project/test-vectors => ./extern/test-vectors + +replace github.com/triplewz/poseidon => github.com/magik6k/poseidon v0.0.0-neptune diff --git a/go.sum b/go.sum index 67b264b55..638d8ef26 100644 --- a/go.sum +++ b/go.sum @@ -991,6 +991,8 @@ github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ic github.com/leodido/go-urn v1.2.4 h1:XlAE/cm/ms7TE/VMVoduSpNBoyc2dOxHs5MZSwAN63Q= github.com/leodido/go-urn v1.2.4/go.mod h1:7ZrI8mTSeBSHl/UaRyKQW1qZeMgak41ANeCNaVckg+4= github.com/lib/pq v1.10.0 h1:Zx5DJFEYQXio93kgXnQ09fXNiUKsqv4OUEu2UtGcB1E= +github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c= +github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= github.com/libp2p/go-addr-util v0.0.1/go.mod h1:4ac6O7n9rIAKB1dnd+s8IbbMXkt+oBpzX4/+RACcnlQ= github.com/libp2p/go-addr-util v0.0.2/go.mod h1:Ecd6Fb3yIuLzq4bD7VcywcVSBtefcAwnUISBM3WG15E= github.com/libp2p/go-buffer-pool v0.0.1/go.mod h1:xtyIz9PMobb13WaxR6Zo1Pd1zXJKYg0a8KiIvDp3TzQ= @@ -1209,6 +1211,8 @@ github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0Q github.com/magefile/mage v1.9.0 h1:t3AU2wNwehMCW97vuqQLtw6puppWXHO+O2MHo5a50XE= github.com/magefile/mage v1.9.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= +github.com/magik6k/poseidon v0.0.0-neptune h1:Dfz15iiYGGE9Esvn8pZFlbiiCrHuyZDxm6LGXQfaf9c= +github.com/magik6k/poseidon v0.0.0-neptune/go.mod h1:QYG1d0B4YZD7TgF6qZndTTu4rxUGFCCZAQRDanDj+9c= github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= @@ -2081,6 +2085,7 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/storage/sealer/commitment/commd.go b/storage/sealer/commitment/commd.go new file mode 100644 index 000000000..b7dc9998a --- /dev/null +++ b/storage/sealer/commitment/commd.go @@ -0,0 +1,34 @@ +package commitment + +import ( + "io" + "os" + "path/filepath" +) + +const treeDFile = "sc-02-data-tree-d.dat" + +// TreeDCommD reads CommD from tree-d +func TreeDCommD(cache string) ([32]byte, error) { + // Open the tree-d file for reading + file, err := os.Open(filepath.Join(cache, treeDFile)) + if err != nil { + return [32]byte{}, err + } + defer file.Close() // nolint:errcheck + + // Seek to 32 bytes from the end of the file + _, err = file.Seek(-32, io.SeekEnd) + if err != nil { + return [32]byte{}, err + } + + // Read the last 32 bytes + var commD [32]byte + _, err = file.Read(commD[:]) + if err != nil { + return [32]byte{}, err + } + + return commD, nil +} diff --git a/storage/sealer/commitment/commr.go b/storage/sealer/commitment/commr.go new file mode 100644 index 000000000..d5f5b0844 --- /dev/null +++ b/storage/sealer/commitment/commr.go @@ -0,0 +1,64 @@ +package commitment + +import ( + "math/big" + "os" + "path/filepath" + + "github.com/triplewz/poseidon" + ff "github.com/triplewz/poseidon/bls12_381" + "golang.org/x/xerrors" +) + +const pauxFile = "p_aux" + +func CommR(commC, commRLast [32]byte) ([32]byte, error) { + // reverse commC and commRLast so that endianness is correct + for i, j := 0, len(commC)-1; i < j; i, j = i+1, j-1 { + commC[i], commC[j] = commC[j], commC[i] + commRLast[i], commRLast[j] = commRLast[j], commRLast[i] + } + + input_a := new(big.Int) + input_a.SetBytes(commC[:]) + input_b := new(big.Int) + input_b.SetBytes(commRLast[:]) + input := []*big.Int{input_a, input_b} + + cons, err := poseidon.GenPoseidonConstants(3) + if err != nil { + return [32]byte{}, err + } + + h1, err := poseidon.Hash(input, cons, poseidon.OptimizedStatic) + if err != nil { + return [32]byte{}, err + } + + h1element := new(ff.Element).SetBigInt(h1).Bytes() + + // reverse the bytes so that endianness is correct + for i, j := 0, len(h1element)-1; i < j; i, j = i+1, j-1 { + h1element[i], h1element[j] = h1element[j], h1element[i] + } + + return h1element, nil +} + +// PAuxCommR reads p_aux and computes CommR +func PAuxCommR(cache string) ([32]byte, error) { + commCcommRLast, err := os.ReadFile(filepath.Join(cache, pauxFile)) + if err != nil { + return [32]byte{}, err + } + + if len(commCcommRLast) != 64 { + return [32]byte{}, xerrors.Errorf("invalid commCcommRLast length %d", len(commCcommRLast)) + } + + var commC, commRLast [32]byte + copy(commC[:], commCcommRLast[:32]) + copy(commRLast[:], commCcommRLast[32:]) + + return CommR(commC, commRLast) +} diff --git a/storage/sealer/commitment/commr_test.go b/storage/sealer/commitment/commr_test.go new file mode 100644 index 000000000..107f483d2 --- /dev/null +++ b/storage/sealer/commitment/commr_test.go @@ -0,0 +1,35 @@ +package commitment + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestCommR(t *testing.T) { + var commC = [32]byte{ + 0x09, 0x1e, 0x07, 0x3b, 0x98, 0x2f, 0x66, 0xf0, + 0x13, 0xc0, 0x26, 0xda, 0x6e, 0x54, 0xd8, 0x7d, + 0xbf, 0x8b, 0xba, 0x84, 0x8e, 0xf5, 0x7a, 0x55, + 0x29, 0xc7, 0xe7, 0xf7, 0x2c, 0x82, 0x88, 0x43, + } + + var commRLast = [32]byte{ + 0xf0, 0xc5, 0x78, 0x5c, 0x6c, 0x8c, 0xf6, 0x2d, + 0x96, 0x8b, 0x1e, 0xcd, 0x68, 0xed, 0xb9, 0xd9, + 0x1e, 0xb9, 0x44, 0x5c, 0x78, 0x58, 0xa6, 0x00, + 0x26, 0xf8, 0x82, 0x68, 0x60, 0xf7, 0xe7, 0x68, + } + + res, err := CommR(commC, commRLast) + require.NoError(t, err) + + var expected = [32]byte{ + 0xe6, 0x74, 0xd1, 0x9e, 0x6c, 0xe7, 0xfc, 0xf3, + 0x3b, 0xbf, 0xd9, 0xb3, 0x43, 0xa0, 0xce, 0xb1, + 0x2d, 0x28, 0x31, 0xd1, 0xda, 0x54, 0x31, 0x61, + 0x89, 0x1e, 0xbc, 0xca, 0xd2, 0xc6, 0xdb, 0x01, + } + + require.Equal(t, res, expected) +} diff --git a/storage/sealer/ffiwrapper/extern_pc2.go b/storage/sealer/ffiwrapper/extern_pc2.go new file mode 100644 index 000000000..de9246efc --- /dev/null +++ b/storage/sealer/ffiwrapper/extern_pc2.go @@ -0,0 +1,88 @@ +package ffiwrapper + +import ( + "context" + "encoding/base64" + "fmt" + "os" + "os/exec" + + "github.com/ipfs/go-cid" + "golang.org/x/xerrors" + + commcid "github.com/filecoin-project/go-fil-commcid" + + "github.com/filecoin-project/lotus/storage/sealer/commitment" + "github.com/filecoin-project/lotus/storage/sealer/storiface" +) + +// MakeExternPrecommit2 creates an implementation of ExternPrecommit2 backed by +// an external command specified by the command string. +// +// The command will be called with a number of environment variables set: +// * EXTSEAL_PC2_SECTOR_NUM: the sector number +// * EXTSEAL_PC2_SECTOR_MINER: the miner id +// * EXTSEAL_PC2_PROOF_TYPE: the proof type +// * EXTSEAL_PC2_SECTOR_SIZE: the sector size in bytes +// * EXTSEAL_PC2_CACHE: the path to the cache directory +// * EXTSEAL_PC2_SEALED: the path to the sealed sector file (initialized with unsealed data by the caller) +// * EXTSEAL_PC2_PC1OUT: output from rust-fil-proofs precommit1 phase (base64 encoded json) +// +// The command is expected to: +// * Create cache sc-02-data-tree-r* files +// * Create cache sc-02-data-tree-c* files +// * Create cache p_aux / t_aux files +// * Transform the sealed file in place +func MakeExternPrecommit2(command string) ExternPrecommit2 { + return func(ctx context.Context, sector storiface.SectorRef, cache, sealed string, pc1out storiface.PreCommit1Out) (sealedCID cid.Cid, unsealedCID cid.Cid, err error) { + ssize, err := sector.ProofType.SectorSize() + if err != nil { + return cid.Undef, cid.Undef, err + } + + // Set environment variables for the external command + env := []string{ + "EXTSEAL_PC2_SECTOR_NUM=" + sector.ID.Number.String(), + "EXTSEAL_PC2_SECTOR_MINER=" + sector.ID.Miner.String(), + "EXTSEAL_PC2_PROOF_TYPE=" + fmt.Sprintf("%d", sector.ProofType), + "EXTSEAL_PC2_SECTOR_SIZE=" + fmt.Sprintf("%d", ssize), + "EXTSEAL_PC2_CACHE=" + cache, + "EXTSEAL_PC2_SEALED=" + sealed, + "EXTSEAL_PC2_PC1OUT=" + base64.StdEncoding.EncodeToString(pc1out), + } + + log.Infow("running external sealing call", "method", "precommit2", "command", command, "env", env) + + // Create and run the external command + cmd := exec.CommandContext(ctx, "sh", "-c", command) + cmd.Env = env + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + if err := cmd.Run(); err != nil { + return cid.Cid{}, cid.Cid{}, xerrors.Errorf("external command error: %w", err) + } + + commr, err := commitment.PAuxCommR(cache) + if err != nil { + return cid.Undef, cid.Undef, xerrors.Errorf("reading p_aux: %w", err) + } + + sealedCID, err = commcid.ReplicaCommitmentV1ToCID(commr[:]) + if err != nil { + return cid.Cid{}, cid.Cid{}, err + } + + commd, err := commitment.TreeDCommD(cache) + if err != nil { + return cid.Undef, cid.Undef, xerrors.Errorf("reading CommD from tree-d: %w", err) + } + + unsealedCID, err = commcid.DataCommitmentV1ToCID(commd[:]) + if err != nil { + return cid.Cid{}, cid.Cid{}, err + } + + return sealedCID, unsealedCID, nil + } +} diff --git a/storage/sealer/ffiwrapper/sealer.go b/storage/sealer/ffiwrapper/sealer.go index 39cb8fa1b..00374ddf5 100644 --- a/storage/sealer/ffiwrapper/sealer.go +++ b/storage/sealer/ffiwrapper/sealer.go @@ -1,13 +1,28 @@ package ffiwrapper import ( + "context" + + "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" + + "github.com/filecoin-project/lotus/storage/sealer/storiface" ) var log = logging.Logger("ffiwrapper") +type ExternPrecommit2 func(ctx context.Context, sector storiface.SectorRef, cache, sealed string, pc1out storiface.PreCommit1Out) (sealedCID cid.Cid, unsealedCID cid.Cid, err error) + +type ExternalSealer struct { + PreCommit2 ExternPrecommit2 +} + type Sealer struct { - sectors SectorProvider + sectors SectorProvider + + // externCalls cointain overrides for calling alternative sealing logic + externCalls ExternalSealer + stopping chan struct{} } diff --git a/storage/sealer/ffiwrapper/sealer_cgo.go b/storage/sealer/ffiwrapper/sealer_cgo.go index 812a69fa7..04e891665 100644 --- a/storage/sealer/ffiwrapper/sealer_cgo.go +++ b/storage/sealer/ffiwrapper/sealer_cgo.go @@ -40,10 +40,30 @@ import ( var _ storiface.Storage = &Sealer{} -func New(sectors SectorProvider) (*Sealer, error) { +type FFIWrapperOpts struct { + ext ExternalSealer +} + +type FFIWrapperOpt func(*FFIWrapperOpts) + +func WithExternalSealCalls(ext ExternalSealer) FFIWrapperOpt { + return func(o *FFIWrapperOpts) { + o.ext = ext + } +} + +func New(sectors SectorProvider, opts ...FFIWrapperOpt) (*Sealer, error) { + options := &FFIWrapperOpts{} + + for _, o := range opts { + o(options) + } + sb := &Sealer{ sectors: sectors, + externCalls: options.ext, + stopping: make(chan struct{}), } @@ -885,9 +905,18 @@ func (sb *Sealer) SealPreCommit2(ctx context.Context, sector storiface.SectorRef } defer done() - sealedCID, unsealedCID, err := ffi.SealPreCommitPhase2(phase1Out, paths.Cache, paths.Sealed) - if err != nil { - return storiface.SectorCids{}, xerrors.Errorf("presealing sector %d (%s): %w", sector.ID.Number, paths.Unsealed, err) + var sealedCID, unsealedCID cid.Cid + + if sb.externCalls.PreCommit2 == nil { + sealedCID, unsealedCID, err = ffi.SealPreCommitPhase2(phase1Out, paths.Cache, paths.Sealed) + if err != nil { + return storiface.SectorCids{}, xerrors.Errorf("presealing sector %d (%s): %w", sector.ID.Number, paths.Unsealed, err) + } + } else { + sealedCID, unsealedCID, err = sb.externCalls.PreCommit2(ctx, sector, paths.Cache, paths.Sealed, phase1Out) + if err != nil { + return storiface.SectorCids{}, xerrors.Errorf("presealing sector (extern-pc2) %d (%s): %w", sector.ID.Number, paths.Unsealed, err) + } } ssize, err := sector.ProofType.SectorSize() diff --git a/storage/sealer/ffiwrapper/sealer_test.go b/storage/sealer/ffiwrapper/sealer_test.go index da1b98429..59821e795 100644 --- a/storage/sealer/ffiwrapper/sealer_test.go +++ b/storage/sealer/ffiwrapper/sealer_test.go @@ -31,6 +31,7 @@ import ( "github.com/filecoin-project/lotus/chain/actors/policy" "github.com/filecoin-project/lotus/storage/pipeline/lib/nullreader" + "github.com/filecoin-project/lotus/storage/sealer/commitment" "github.com/filecoin-project/lotus/storage/sealer/ffiwrapper/basicfs" "github.com/filecoin-project/lotus/storage/sealer/storiface" ) @@ -1093,7 +1094,11 @@ func TestDCAPCloses(t *testing.T) { } func TestSealAndVerifySynth(t *testing.T) { + origSealProofType := sealProofType sealProofType = abi.RegisteredSealProof_StackedDrg2KiBV1_1_Feat_SyntheticPoRep + t.Cleanup(func() { + sealProofType = origSealProofType + }) if testing.Short() { t.Skip("skipping test in short mode") @@ -1214,6 +1219,61 @@ func (c *closeAssertReader) Close() error { var _ io.Closer = &closeAssertReader{} +func TestSealCommDRInGo(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode") + } + + defer requireFDsClosed(t, openFDs(t)) + + cdir, err := os.MkdirTemp("", "sbtest-c-") + require.NoError(t, err) + miner := abi.ActorID(123) + + sp := &basicfs.Provider{ + Root: cdir, + } + sb, err := New(sp) + require.NoError(t, err) + + t.Cleanup(func() { + if t.Failed() { + fmt.Printf("not removing %s\n", cdir) + return + } + if err := os.RemoveAll(cdir); err != nil { + t.Error(err) + } + }) + + si := storiface.SectorRef{ + ID: abi.SectorID{Miner: miner, Number: 1}, + ProofType: sealProofType, + } + + s := seal{ref: si} + + s.precommit(t, sb, si, func() {}) + + p, _, err := sp.AcquireSector(context.Background(), si, storiface.FTCache, storiface.FTNone, storiface.PathStorage) + require.NoError(t, err) + + commr, err := commitment.PAuxCommR(p.Cache) + require.NoError(t, err) + + commd, err := commitment.TreeDCommD(p.Cache) + require.NoError(t, err) + + sealCid, err := commcid.ReplicaCommitmentV1ToCID(commr[:]) + require.NoError(t, err) + + unsealedCid, err := commcid.DataCommitmentV1ToCID(commd[:]) + require.NoError(t, err) + + require.Equal(t, s.cids.Sealed, sealCid) + require.Equal(t, s.cids.Unsealed, unsealedCid) +} + func TestGenerateSDR(t *testing.T) { d := t.TempDir() diff --git a/storage/sealer/manager_test.go b/storage/sealer/manager_test.go index d76424d5e..bd03cd097 100644 --- a/storage/sealer/manager_test.go +++ b/storage/sealer/manager_test.go @@ -648,7 +648,7 @@ func TestRestartWorker(t *testing.T) { wds := syncds.MutexWrap(datastore.NewMapDatastore()) arch := make(chan chan apres) - w := newLocalWorker(func() (storiface.Storage, error) { + w := NewLocalWorkerWithExecutor(func(_ *LocalWorker) (storiface.Storage, error) { return &testExec{apch: arch}, nil }, WorkerConfig{ TaskTypes: localTasks, @@ -685,7 +685,7 @@ func TestRestartWorker(t *testing.T) { } // restart the worker - w = newLocalWorker(func() (storiface.Storage, error) { + w = NewLocalWorkerWithExecutor(func(_ *LocalWorker) (storiface.Storage, error) { return &testExec{apch: arch}, nil }, WorkerConfig{ TaskTypes: localTasks, @@ -721,7 +721,7 @@ func TestReenableWorker(t *testing.T) { wds := datastore.NewMapDatastore() arch := make(chan chan apres) - w := newLocalWorker(func() (storiface.Storage, error) { + w := NewLocalWorkerWithExecutor(func(_ *LocalWorker) (storiface.Storage, error) { return &testExec{apch: arch}, nil }, WorkerConfig{ TaskTypes: localTasks, @@ -794,7 +794,7 @@ func TestResUse(t *testing.T) { wds := syncds.MutexWrap(datastore.NewMapDatastore()) arch := make(chan chan apres) - w := newLocalWorker(func() (storiface.Storage, error) { + w := NewLocalWorkerWithExecutor(func(_ *LocalWorker) (storiface.Storage, error) { return &testExec{apch: arch}, nil }, WorkerConfig{ TaskTypes: localTasks, @@ -852,7 +852,7 @@ func TestResOverride(t *testing.T) { wds := syncds.MutexWrap(datastore.NewMapDatastore()) arch := make(chan chan apres) - w := newLocalWorker(func() (storiface.Storage, error) { + w := NewLocalWorkerWithExecutor(func(_ *LocalWorker) (storiface.Storage, error) { return &testExec{apch: arch}, nil }, WorkerConfig{ TaskTypes: localTasks, diff --git a/storage/sealer/piece_provider_test.go b/storage/sealer/piece_provider_test.go index a8c243379..2acea47a5 100644 --- a/storage/sealer/piece_provider_test.go +++ b/storage/sealer/piece_provider_test.go @@ -289,7 +289,7 @@ func (p *pieceProviderTestHarness) addRemoteWorker(t *testing.T, tasks []sealtas dstore := ds_sync.MutexWrap(datastore.NewMapDatastore()) csts := statestore.New(namespace.Wrap(dstore, datastore.NewKey("/stmgr/calls"))) - worker := newLocalWorker(nil, WorkerConfig{ + worker := NewLocalWorkerWithExecutor(nil, WorkerConfig{ TaskTypes: tasks, }, os.LookupEnv, remote, localStore, p.index, p.mgr, csts) diff --git a/storage/sealer/worker_local.go b/storage/sealer/worker_local.go index 67510566c..84de60f9c 100644 --- a/storage/sealer/worker_local.go +++ b/storage/sealer/worker_local.go @@ -47,7 +47,7 @@ type WorkerConfig struct { } // used do provide custom proofs impl (mostly used in testing) -type ExecutorFunc func() (storiface.Storage, error) +type ExecutorFunc func(w *LocalWorker) (storiface.Storage, error) type EnvFunc func(string) (string, bool) type LocalWorker struct { @@ -77,7 +77,7 @@ type LocalWorker struct { closing chan struct{} } -func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, envLookup EnvFunc, store paths.Store, local *paths.Local, sindex paths.SectorIndex, ret storiface.WorkerReturn, cst *statestore.StateStore) *LocalWorker { +func NewLocalWorkerWithExecutor(executor ExecutorFunc, wcfg WorkerConfig, envLookup EnvFunc, store paths.Store, local *paths.Local, sindex paths.SectorIndex, ret storiface.WorkerReturn, cst *statestore.StateStore) *LocalWorker { acceptTasks := map[sealtasks.TaskType]struct{}{} for _, taskType := range wcfg.TaskTypes { acceptTasks[taskType] = struct{}{} @@ -116,7 +116,7 @@ func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, envLookup EnvFunc, } if w.executor == nil { - w.executor = w.ffiExec + w.executor = FFIExec() } unfinished, err := w.ct.unfinished() @@ -143,7 +143,7 @@ func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, envLookup EnvFunc, } func NewLocalWorker(wcfg WorkerConfig, store paths.Store, local *paths.Local, sindex paths.SectorIndex, ret storiface.WorkerReturn, cst *statestore.StateStore) *LocalWorker { - return newLocalWorker(nil, wcfg, os.LookupEnv, store, local, sindex, ret, cst) + return NewLocalWorkerWithExecutor(nil, wcfg, os.LookupEnv, store, local, sindex, ret, cst) } type localWorkerPathProvider struct { @@ -180,12 +180,14 @@ func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector stor }, nil } -func (l *localWorkerPathProvider) AcquireSectorCopy(ctx context.Context, id storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error) { - return (&localWorkerPathProvider{w: l.w, op: storiface.AcquireCopy}).AcquireSector(ctx, id, existing, allocate, ptype) +func FFIExec(opts ...ffiwrapper.FFIWrapperOpt) func(l *LocalWorker) (storiface.Storage, error) { + return func(l *LocalWorker) (storiface.Storage, error) { + return ffiwrapper.New(&localWorkerPathProvider{w: l}, opts...) + } } -func (l *LocalWorker) ffiExec() (storiface.Storage, error) { - return ffiwrapper.New(&localWorkerPathProvider{w: l}) +func (l *localWorkerPathProvider) AcquireSectorCopy(ctx context.Context, id storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error) { + return (&localWorkerPathProvider{w: l.w, op: storiface.AcquireCopy}).AcquireSector(ctx, id, existing, allocate, ptype) } type ReturnType string @@ -343,7 +345,7 @@ func doReturn(ctx context.Context, rt ReturnType, ci storiface.CallID, ret stori } func (l *LocalWorker) NewSector(ctx context.Context, sector storiface.SectorRef) error { - sb, err := l.executor() + sb, err := l.executor(l) if err != nil { return err } @@ -352,7 +354,7 @@ func (l *LocalWorker) NewSector(ctx context.Context, sector storiface.SectorRef) } func (l *LocalWorker) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storiface.Data) (storiface.CallID, error) { - sb, err := l.executor() + sb, err := l.executor(l) if err != nil { return storiface.UndefCall, err } @@ -363,7 +365,7 @@ func (l *LocalWorker) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSi } func (l *LocalWorker) AddPiece(ctx context.Context, sector storiface.SectorRef, epcs []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (storiface.CallID, error) { - sb, err := l.executor() + sb, err := l.executor(l) if err != nil { return storiface.UndefCall, err } @@ -398,7 +400,7 @@ func (l *LocalWorker) SealPreCommit1(ctx context.Context, sector storiface.Secto } } - sb, err := l.executor() + sb, err := l.executor(l) if err != nil { return nil, err } @@ -408,7 +410,7 @@ func (l *LocalWorker) SealPreCommit1(ctx context.Context, sector storiface.Secto } func (l *LocalWorker) SealPreCommit2(ctx context.Context, sector storiface.SectorRef, phase1Out storiface.PreCommit1Out) (storiface.CallID, error) { - sb, err := l.executor() + sb, err := l.executor(l) if err != nil { return storiface.UndefCall, err } @@ -419,7 +421,7 @@ func (l *LocalWorker) SealPreCommit2(ctx context.Context, sector storiface.Secto } func (l *LocalWorker) SealCommit1(ctx context.Context, sector storiface.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storiface.SectorCids) (storiface.CallID, error) { - sb, err := l.executor() + sb, err := l.executor(l) if err != nil { return storiface.UndefCall, err } @@ -430,7 +432,7 @@ func (l *LocalWorker) SealCommit1(ctx context.Context, sector storiface.SectorRe } func (l *LocalWorker) SealCommit2(ctx context.Context, sector storiface.SectorRef, phase1Out storiface.Commit1Out) (storiface.CallID, error) { - sb, err := l.executor() + sb, err := l.executor(l) if err != nil { return storiface.UndefCall, err } @@ -441,7 +443,7 @@ func (l *LocalWorker) SealCommit2(ctx context.Context, sector storiface.SectorRe } func (l *LocalWorker) ReplicaUpdate(ctx context.Context, sector storiface.SectorRef, pieces []abi.PieceInfo) (storiface.CallID, error) { - sb, err := l.executor() + sb, err := l.executor(l) if err != nil { return storiface.UndefCall, err } @@ -453,7 +455,7 @@ func (l *LocalWorker) ReplicaUpdate(ctx context.Context, sector storiface.Sector } func (l *LocalWorker) ProveReplicaUpdate1(ctx context.Context, sector storiface.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (storiface.CallID, error) { - sb, err := l.executor() + sb, err := l.executor(l) if err != nil { return storiface.UndefCall, err } @@ -464,7 +466,7 @@ func (l *LocalWorker) ProveReplicaUpdate1(ctx context.Context, sector storiface. } func (l *LocalWorker) ProveReplicaUpdate2(ctx context.Context, sector storiface.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid, vanillaProofs storiface.ReplicaVanillaProofs) (storiface.CallID, error) { - sb, err := l.executor() + sb, err := l.executor(l) if err != nil { return storiface.UndefCall, err } @@ -475,7 +477,7 @@ func (l *LocalWorker) ProveReplicaUpdate2(ctx context.Context, sector storiface. } func (l *LocalWorker) GenerateSectorKeyFromData(ctx context.Context, sector storiface.SectorRef, commD cid.Cid) (storiface.CallID, error) { - sb, err := l.executor() + sb, err := l.executor(l) if err != nil { return storiface.UndefCall, err } @@ -486,7 +488,7 @@ func (l *LocalWorker) GenerateSectorKeyFromData(ctx context.Context, sector stor } func (l *LocalWorker) FinalizeSector(ctx context.Context, sector storiface.SectorRef) (storiface.CallID, error) { - sb, err := l.executor() + sb, err := l.executor(l) if err != nil { return storiface.UndefCall, err } @@ -497,7 +499,7 @@ func (l *LocalWorker) FinalizeSector(ctx context.Context, sector storiface.Secto } func (l *LocalWorker) FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef) (storiface.CallID, error) { - sb, err := l.executor() + sb, err := l.executor(l) if err != nil { return storiface.UndefCall, err } @@ -508,7 +510,7 @@ func (l *LocalWorker) FinalizeReplicaUpdate(ctx context.Context, sector storifac } func (l *LocalWorker) ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) (storiface.CallID, error) { - sb, err := l.executor() + sb, err := l.executor(l) if err != nil { return storiface.UndefCall, err } @@ -564,7 +566,7 @@ func (l *LocalWorker) MoveStorage(ctx context.Context, sector storiface.SectorRe } func (l *LocalWorker) UnsealPiece(ctx context.Context, sector storiface.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) { - sb, err := l.executor() + sb, err := l.executor(l) if err != nil { return storiface.UndefCall, err } @@ -591,7 +593,7 @@ func (l *LocalWorker) UnsealPiece(ctx context.Context, sector storiface.SectorRe } func (l *LocalWorker) DownloadSectorData(ctx context.Context, sector storiface.SectorRef, finalized bool, src map[storiface.SectorFileType]storiface.SectorLocation) (storiface.CallID, error) { - sb, err := l.executor() + sb, err := l.executor(l) if err != nil { return storiface.UndefCall, err } @@ -602,7 +604,7 @@ func (l *LocalWorker) DownloadSectorData(ctx context.Context, sector storiface.S } func (l *LocalWorker) GenerateWinningPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []storiface.PostSectorChallenge, randomness abi.PoStRandomness) ([]proof.PoStProof, error) { - sb, err := l.executor() + sb, err := l.executor(l) if err != nil { return nil, err } @@ -647,11 +649,7 @@ func (l *LocalWorker) GenerateWinningPoSt(ctx context.Context, ppt abi.Registere } func (l *LocalWorker) GenerateWindowPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []storiface.PostSectorChallenge, partitionIdx int, randomness abi.PoStRandomness) (storiface.WindowPoStResult, error) { - return l.GenerateWindowPoStAdv(ctx, ppt, mid, sectors, partitionIdx, randomness, false) -} - -func (l *LocalWorker) GenerateWindowPoStAdv(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []storiface.PostSectorChallenge, partitionIdx int, randomness abi.PoStRandomness, allowSkip bool) (storiface.WindowPoStResult, error) { - sb, err := l.executor() + sb, err := l.executor(l) if err != nil { return storiface.WindowPoStResult{}, err }