Merge pull request #11185 from filecoin-project/feat/snpc2

feat: worker: Support delegating precommit2 to external binary
This commit is contained in:
Phi-rjan 2023-12-01 12:32:26 +01:00 committed by GitHub
commit 2c00b5db3c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 492 additions and 45 deletions

View File

@ -308,7 +308,36 @@ var simplePreCommit2 = &cli.Command{
Name: "synthetic",
Usage: "generate synthetic PoRep proofs",
},
&cli.StringFlag{
Name: "external-pc2",
Usage: "command for computing PC2 externally",
},
},
Description: `Compute PreCommit2 inputs and seal a sector.
--external-pc2 can be used to compute the PreCommit2 inputs externally.
The flag behaves similarly to the related lotus-worker flag, using it in
lotus-bench may be useful for testing if the external PreCommit2 command is
invoked correctly.
The command will be called with a number of environment variables set:
* EXTSEAL_PC2_SECTOR_NUM: the sector number
* EXTSEAL_PC2_SECTOR_MINER: the miner id
* EXTSEAL_PC2_PROOF_TYPE: the proof type
* EXTSEAL_PC2_SECTOR_SIZE: the sector size in bytes
* EXTSEAL_PC2_CACHE: the path to the cache directory
* EXTSEAL_PC2_SEALED: the path to the sealed sector file (initialized with unsealed data by the caller)
* EXTSEAL_PC2_PC1OUT: output from rust-fil-proofs precommit1 phase (base64 encoded json)
The command is expected to:
* Create cache sc-02-data-tree-r* files
* Create cache sc-02-data-tree-c* files
* Create cache p_aux / t_aux files
* Transform the sealed file in place
Example invocation of lotus-bench as external executor:
'./lotus-bench simple precommit2 --sector-size $EXTSEAL_PC2_SECTOR_SIZE $EXTSEAL_PC2_SEALED $EXTSEAL_PC2_CACHE $EXTSEAL_PC2_PC1OUT'
`,
ArgsUsage: "[sealed] [cache] [pc1 out]",
Action: func(cctx *cli.Context) error {
ctx := cctx.Context
@ -333,7 +362,18 @@ var simplePreCommit2 = &cli.Command{
storiface.FTSealed: cctx.Args().Get(0),
storiface.FTCache: cctx.Args().Get(1),
}
sealer, err := ffiwrapper.New(pp)
var opts []ffiwrapper.FFIWrapperOpt
if cctx.IsSet("external-pc2") {
extSeal := ffiwrapper.ExternalSealer{
PreCommit2: ffiwrapper.MakeExternPrecommit2(cctx.String("external-pc2")),
}
opts = append(opts, ffiwrapper.WithExternalSealCalls(extSeal))
}
sealer, err := ffiwrapper.New(pp, opts...)
if err != nil {
return err
}

View File

@ -39,6 +39,7 @@ import (
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer"
"github.com/filecoin-project/lotus/storage/sealer/ffiwrapper"
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
@ -284,7 +285,36 @@ var runCmd = &cli.Command{
Value: true,
DefaultText: "inherits --addpiece",
},
&cli.StringFlag{
Name: "external-pc2",
Usage: "command for computing PC2 externally",
},
},
Description: `Run lotus-worker.
--external-pc2 can be used to compute the PreCommit2 inputs externally.
The flag behaves similarly to the related lotus-worker flag, using it in
lotus-bench may be useful for testing if the external PreCommit2 command is
invoked correctly.
The command will be called with a number of environment variables set:
* EXTSEAL_PC2_SECTOR_NUM: the sector number
* EXTSEAL_PC2_SECTOR_MINER: the miner id
* EXTSEAL_PC2_PROOF_TYPE: the proof type
* EXTSEAL_PC2_SECTOR_SIZE: the sector size in bytes
* EXTSEAL_PC2_CACHE: the path to the cache directory
* EXTSEAL_PC2_SEALED: the path to the sealed sector file (initialized with unsealed data by the caller)
* EXTSEAL_PC2_PC1OUT: output from rust-fil-proofs precommit1 phase (base64 encoded json)
The command is expected to:
* Create cache sc-02-data-tree-r* files
* Create cache sc-02-data-tree-c* files
* Create cache p_aux / t_aux files
* Transform the sealed file in place
Example invocation of lotus-bench as external executor:
'./lotus-bench simple precommit2 --sector-size $EXTSEAL_PC2_SECTOR_SIZE $EXTSEAL_PC2_SEALED $EXTSEAL_PC2_CACHE $EXTSEAL_PC2_PC1OUT'
`,
Before: func(cctx *cli.Context) error {
if cctx.IsSet("address") {
log.Warnf("The '--address' flag is deprecated, it has been replaced by '--listen'")
@ -623,18 +653,32 @@ var runCmd = &cli.Command{
fh.ServeHTTP(w, r)
}
// Parse ffi executor flags
var ffiOpts []ffiwrapper.FFIWrapperOpt
if cctx.IsSet("external-pc2") {
extSeal := ffiwrapper.ExternalSealer{
PreCommit2: ffiwrapper.MakeExternPrecommit2(cctx.String("external-pc2")),
}
ffiOpts = append(ffiOpts, ffiwrapper.WithExternalSealCalls(extSeal))
}
// Create / expose the worker
wsts := statestore.New(namespace.Wrap(ds, modules.WorkerCallsPrefix))
workerApi := &sealworker.Worker{
LocalWorker: sealer.NewLocalWorker(sealer.WorkerConfig{
TaskTypes: taskTypes,
NoSwap: cctx.Bool("no-swap"),
MaxParallelChallengeReads: cctx.Int("post-parallel-reads"),
ChallengeReadTimeout: cctx.Duration("post-read-timeout"),
Name: cctx.String("name"),
}, remote, localStore, nodeApi, nodeApi, wsts),
LocalWorker: sealer.NewLocalWorkerWithExecutor(
sealer.FFIExec(ffiOpts...),
sealer.WorkerConfig{
TaskTypes: taskTypes,
NoSwap: cctx.Bool("no-swap"),
MaxParallelChallengeReads: cctx.Int("post-parallel-reads"),
ChallengeReadTimeout: cctx.Duration("post-read-timeout"),
Name: cctx.String("name"),
}, os.LookupEnv, remote, localStore, nodeApi, nodeApi, wsts),
LocalStore: localStore,
Storage: lr,
}

View File

@ -34,6 +34,33 @@ NAME:
USAGE:
lotus-worker run [command options] [arguments...]
DESCRIPTION:
Run lotus-worker.
--external-pc2 can be used to compute the PreCommit2 inputs externally.
The flag behaves similarly to the related lotus-worker flag, using it in
lotus-bench may be useful for testing if the external PreCommit2 command is
invoked correctly.
The command will be called with a number of environment variables set:
* EXTSEAL_PC2_SECTOR_NUM: the sector number
* EXTSEAL_PC2_SECTOR_MINER: the miner id
* EXTSEAL_PC2_PROOF_TYPE: the proof type
* EXTSEAL_PC2_SECTOR_SIZE: the sector size in bytes
* EXTSEAL_PC2_CACHE: the path to the cache directory
* EXTSEAL_PC2_SEALED: the path to the sealed sector file (initialized with unsealed data by the caller)
* EXTSEAL_PC2_PC1OUT: output from rust-fil-proofs precommit1 phase (base64 encoded json)
The command is expected to:
* Create cache sc-02-data-tree-r* files
* Create cache sc-02-data-tree-c* files
* Create cache p_aux / t_aux files
* Transform the sealed file in place
Example invocation of lotus-bench as external executor:
'./lotus-bench simple precommit2 --sector-size $EXTSEAL_PC2_SECTOR_SIZE $EXTSEAL_PC2_SEALED $EXTSEAL_PC2_CACHE $EXTSEAL_PC2_PC1OUT'
OPTIONS:
--listen value host address and port the worker api will listen on (default: "0.0.0.0:3456") [$LOTUS_WORKER_LISTEN]
--no-local-storage don't use storageminer repo for sector storage (default: false) [$LOTUS_WORKER_NO_LOCAL_STORAGE]
@ -57,6 +84,7 @@ OPTIONS:
--timeout value used when 'listen' is unspecified. must be a valid duration recognized by golang's time.ParseDuration function (default: "30m") [$LOTUS_WORKER_TIMEOUT]
--http-server-timeout value (default: "30s")
--data-cid Run the data-cid task. true|false (default: inherits --addpiece)
--external-pc2 value command for computing PC2 externally
--help, -h show help
```

3
go.mod
View File

@ -136,6 +136,7 @@ require (
github.com/raulk/go-watchdog v1.3.0
github.com/stretchr/testify v1.8.4
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
github.com/triplewz/poseidon v0.0.0-20220525065023-a7cdb0e183e7
github.com/urfave/cli/v2 v2.25.5
github.com/whyrusleeping/bencher v0.0.0-20190829221104-bb6607aa8bba
github.com/whyrusleeping/cbor-gen v0.0.0-20230923211252-36a87e1ba72f
@ -338,3 +339,5 @@ require (
replace github.com/filecoin-project/filecoin-ffi => ./extern/filecoin-ffi
replace github.com/filecoin-project/test-vectors => ./extern/test-vectors
replace github.com/triplewz/poseidon => github.com/magik6k/poseidon v0.0.0-neptune

5
go.sum
View File

@ -943,6 +943,8 @@ github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
github.com/libp2p/go-addr-util v0.0.1/go.mod h1:4ac6O7n9rIAKB1dnd+s8IbbMXkt+oBpzX4/+RACcnlQ=
github.com/libp2p/go-addr-util v0.0.2/go.mod h1:Ecd6Fb3yIuLzq4bD7VcywcVSBtefcAwnUISBM3WG15E=
github.com/libp2p/go-buffer-pool v0.0.1/go.mod h1:xtyIz9PMobb13WaxR6Zo1Pd1zXJKYg0a8KiIvDp3TzQ=
@ -1161,6 +1163,8 @@ github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0Q
github.com/magefile/mage v1.9.0 h1:t3AU2wNwehMCW97vuqQLtw6puppWXHO+O2MHo5a50XE=
github.com/magefile/mage v1.9.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/magik6k/poseidon v0.0.0-neptune h1:Dfz15iiYGGE9Esvn8pZFlbiiCrHuyZDxm6LGXQfaf9c=
github.com/magik6k/poseidon v0.0.0-neptune/go.mod h1:QYG1d0B4YZD7TgF6qZndTTu4rxUGFCCZAQRDanDj+9c=
github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
@ -2016,6 +2020,7 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=

View File

@ -0,0 +1,34 @@
package commitment
import (
"io"
"os"
"path/filepath"
)
const treeDFile = "sc-02-data-tree-d.dat"
// TreeDCommD reads CommD from tree-d
func TreeDCommD(cache string) ([32]byte, error) {
// Open the tree-d file for reading
file, err := os.Open(filepath.Join(cache, treeDFile))
if err != nil {
return [32]byte{}, err
}
defer file.Close() // nolint:errcheck
// Seek to 32 bytes from the end of the file
_, err = file.Seek(-32, io.SeekEnd)
if err != nil {
return [32]byte{}, err
}
// Read the last 32 bytes
var commD [32]byte
_, err = file.Read(commD[:])
if err != nil {
return [32]byte{}, err
}
return commD, nil
}

View File

@ -0,0 +1,64 @@
package commitment
import (
"math/big"
"os"
"path/filepath"
"github.com/triplewz/poseidon"
ff "github.com/triplewz/poseidon/bls12_381"
"golang.org/x/xerrors"
)
const pauxFile = "p_aux"
func CommR(commC, commRLast [32]byte) ([32]byte, error) {
// reverse commC and commRLast so that endianness is correct
for i, j := 0, len(commC)-1; i < j; i, j = i+1, j-1 {
commC[i], commC[j] = commC[j], commC[i]
commRLast[i], commRLast[j] = commRLast[j], commRLast[i]
}
input_a := new(big.Int)
input_a.SetBytes(commC[:])
input_b := new(big.Int)
input_b.SetBytes(commRLast[:])
input := []*big.Int{input_a, input_b}
cons, err := poseidon.GenPoseidonConstants(3)
if err != nil {
return [32]byte{}, err
}
h1, err := poseidon.Hash(input, cons, poseidon.OptimizedStatic)
if err != nil {
return [32]byte{}, err
}
h1element := new(ff.Element).SetBigInt(h1).Bytes()
// reverse the bytes so that endianness is correct
for i, j := 0, len(h1element)-1; i < j; i, j = i+1, j-1 {
h1element[i], h1element[j] = h1element[j], h1element[i]
}
return h1element, nil
}
// PAuxCommR reads p_aux and computes CommR
func PAuxCommR(cache string) ([32]byte, error) {
commCcommRLast, err := os.ReadFile(filepath.Join(cache, pauxFile))
if err != nil {
return [32]byte{}, err
}
if len(commCcommRLast) != 64 {
return [32]byte{}, xerrors.Errorf("invalid commCcommRLast length %d", len(commCcommRLast))
}
var commC, commRLast [32]byte
copy(commC[:], commCcommRLast[:32])
copy(commRLast[:], commCcommRLast[32:])
return CommR(commC, commRLast)
}

View File

@ -0,0 +1,35 @@
package commitment
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestCommR(t *testing.T) {
var commC = [32]byte{
0x09, 0x1e, 0x07, 0x3b, 0x98, 0x2f, 0x66, 0xf0,
0x13, 0xc0, 0x26, 0xda, 0x6e, 0x54, 0xd8, 0x7d,
0xbf, 0x8b, 0xba, 0x84, 0x8e, 0xf5, 0x7a, 0x55,
0x29, 0xc7, 0xe7, 0xf7, 0x2c, 0x82, 0x88, 0x43,
}
var commRLast = [32]byte{
0xf0, 0xc5, 0x78, 0x5c, 0x6c, 0x8c, 0xf6, 0x2d,
0x96, 0x8b, 0x1e, 0xcd, 0x68, 0xed, 0xb9, 0xd9,
0x1e, 0xb9, 0x44, 0x5c, 0x78, 0x58, 0xa6, 0x00,
0x26, 0xf8, 0x82, 0x68, 0x60, 0xf7, 0xe7, 0x68,
}
res, err := CommR(commC, commRLast)
require.NoError(t, err)
var expected = [32]byte{
0xe6, 0x74, 0xd1, 0x9e, 0x6c, 0xe7, 0xfc, 0xf3,
0x3b, 0xbf, 0xd9, 0xb3, 0x43, 0xa0, 0xce, 0xb1,
0x2d, 0x28, 0x31, 0xd1, 0xda, 0x54, 0x31, 0x61,
0x89, 0x1e, 0xbc, 0xca, 0xd2, 0xc6, 0xdb, 0x01,
}
require.Equal(t, res, expected)
}

View File

@ -0,0 +1,88 @@
package ffiwrapper
import (
"context"
"encoding/base64"
"fmt"
"os"
"os/exec"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
commcid "github.com/filecoin-project/go-fil-commcid"
"github.com/filecoin-project/lotus/storage/sealer/commitment"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
// MakeExternPrecommit2 creates an implementation of ExternPrecommit2 backed by
// an external command specified by the command string.
//
// The command will be called with a number of environment variables set:
// * EXTSEAL_PC2_SECTOR_NUM: the sector number
// * EXTSEAL_PC2_SECTOR_MINER: the miner id
// * EXTSEAL_PC2_PROOF_TYPE: the proof type
// * EXTSEAL_PC2_SECTOR_SIZE: the sector size in bytes
// * EXTSEAL_PC2_CACHE: the path to the cache directory
// * EXTSEAL_PC2_SEALED: the path to the sealed sector file (initialized with unsealed data by the caller)
// * EXTSEAL_PC2_PC1OUT: output from rust-fil-proofs precommit1 phase (base64 encoded json)
//
// The command is expected to:
// * Create cache sc-02-data-tree-r* files
// * Create cache sc-02-data-tree-c* files
// * Create cache p_aux / t_aux files
// * Transform the sealed file in place
func MakeExternPrecommit2(command string) ExternPrecommit2 {
return func(ctx context.Context, sector storiface.SectorRef, cache, sealed string, pc1out storiface.PreCommit1Out) (sealedCID cid.Cid, unsealedCID cid.Cid, err error) {
ssize, err := sector.ProofType.SectorSize()
if err != nil {
return cid.Undef, cid.Undef, err
}
// Set environment variables for the external command
env := []string{
"EXTSEAL_PC2_SECTOR_NUM=" + sector.ID.Number.String(),
"EXTSEAL_PC2_SECTOR_MINER=" + sector.ID.Miner.String(),
"EXTSEAL_PC2_PROOF_TYPE=" + fmt.Sprintf("%d", sector.ProofType),
"EXTSEAL_PC2_SECTOR_SIZE=" + fmt.Sprintf("%d", ssize),
"EXTSEAL_PC2_CACHE=" + cache,
"EXTSEAL_PC2_SEALED=" + sealed,
"EXTSEAL_PC2_PC1OUT=" + base64.StdEncoding.EncodeToString(pc1out),
}
log.Infow("running external sealing call", "method", "precommit2", "command", command, "env", env)
// Create and run the external command
cmd := exec.CommandContext(ctx, "sh", "-c", command)
cmd.Env = env
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
return cid.Cid{}, cid.Cid{}, xerrors.Errorf("external command error: %w", err)
}
commr, err := commitment.PAuxCommR(cache)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("reading p_aux: %w", err)
}
sealedCID, err = commcid.ReplicaCommitmentV1ToCID(commr[:])
if err != nil {
return cid.Cid{}, cid.Cid{}, err
}
commd, err := commitment.TreeDCommD(cache)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("reading CommD from tree-d: %w", err)
}
unsealedCID, err = commcid.DataCommitmentV1ToCID(commd[:])
if err != nil {
return cid.Cid{}, cid.Cid{}, err
}
return sealedCID, unsealedCID, nil
}
}

View File

@ -1,13 +1,28 @@
package ffiwrapper
import (
"context"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
var log = logging.Logger("ffiwrapper")
type ExternPrecommit2 func(ctx context.Context, sector storiface.SectorRef, cache, sealed string, pc1out storiface.PreCommit1Out) (sealedCID cid.Cid, unsealedCID cid.Cid, err error)
type ExternalSealer struct {
PreCommit2 ExternPrecommit2
}
type Sealer struct {
sectors SectorProvider
sectors SectorProvider
// externCalls cointain overrides for calling alternative sealing logic
externCalls ExternalSealer
stopping chan struct{}
}

View File

@ -40,10 +40,30 @@ import (
var _ storiface.Storage = &Sealer{}
func New(sectors SectorProvider) (*Sealer, error) {
type FFIWrapperOpts struct {
ext ExternalSealer
}
type FFIWrapperOpt func(*FFIWrapperOpts)
func WithExternalSealCalls(ext ExternalSealer) FFIWrapperOpt {
return func(o *FFIWrapperOpts) {
o.ext = ext
}
}
func New(sectors SectorProvider, opts ...FFIWrapperOpt) (*Sealer, error) {
options := &FFIWrapperOpts{}
for _, o := range opts {
o(options)
}
sb := &Sealer{
sectors: sectors,
externCalls: options.ext,
stopping: make(chan struct{}),
}
@ -881,9 +901,18 @@ func (sb *Sealer) SealPreCommit2(ctx context.Context, sector storiface.SectorRef
}
defer done()
sealedCID, unsealedCID, err := ffi.SealPreCommitPhase2(phase1Out, paths.Cache, paths.Sealed)
if err != nil {
return storiface.SectorCids{}, xerrors.Errorf("presealing sector %d (%s): %w", sector.ID.Number, paths.Unsealed, err)
var sealedCID, unsealedCID cid.Cid
if sb.externCalls.PreCommit2 == nil {
sealedCID, unsealedCID, err = ffi.SealPreCommitPhase2(phase1Out, paths.Cache, paths.Sealed)
if err != nil {
return storiface.SectorCids{}, xerrors.Errorf("presealing sector %d (%s): %w", sector.ID.Number, paths.Unsealed, err)
}
} else {
sealedCID, unsealedCID, err = sb.externCalls.PreCommit2(ctx, sector, paths.Cache, paths.Sealed, phase1Out)
if err != nil {
return storiface.SectorCids{}, xerrors.Errorf("presealing sector (extern-pc2) %d (%s): %w", sector.ID.Number, paths.Unsealed, err)
}
}
ssize, err := sector.ProofType.SectorSize()

View File

@ -30,6 +30,7 @@ import (
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/storage/pipeline/lib/nullreader"
"github.com/filecoin-project/lotus/storage/sealer/commitment"
"github.com/filecoin-project/lotus/storage/sealer/ffiwrapper/basicfs"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
@ -1090,7 +1091,11 @@ func TestDCAPCloses(t *testing.T) {
}
func TestSealAndVerifySynth(t *testing.T) {
origSealProofType := sealProofType
sealProofType = abi.RegisteredSealProof_StackedDrg2KiBV1_1_Feat_SyntheticPoRep
t.Cleanup(func() {
sealProofType = origSealProofType
})
if testing.Short() {
t.Skip("skipping test in short mode")
@ -1211,6 +1216,61 @@ func (c *closeAssertReader) Close() error {
var _ io.Closer = &closeAssertReader{}
func TestSealCommDRInGo(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode")
}
defer requireFDsClosed(t, openFDs(t))
cdir, err := os.MkdirTemp("", "sbtest-c-")
require.NoError(t, err)
miner := abi.ActorID(123)
sp := &basicfs.Provider{
Root: cdir,
}
sb, err := New(sp)
require.NoError(t, err)
t.Cleanup(func() {
if t.Failed() {
fmt.Printf("not removing %s\n", cdir)
return
}
if err := os.RemoveAll(cdir); err != nil {
t.Error(err)
}
})
si := storiface.SectorRef{
ID: abi.SectorID{Miner: miner, Number: 1},
ProofType: sealProofType,
}
s := seal{ref: si}
s.precommit(t, sb, si, func() {})
p, _, err := sp.AcquireSector(context.Background(), si, storiface.FTCache, storiface.FTNone, storiface.PathStorage)
require.NoError(t, err)
commr, err := commitment.PAuxCommR(p.Cache)
require.NoError(t, err)
commd, err := commitment.TreeDCommD(p.Cache)
require.NoError(t, err)
sealCid, err := commcid.ReplicaCommitmentV1ToCID(commr[:])
require.NoError(t, err)
unsealedCid, err := commcid.DataCommitmentV1ToCID(commd[:])
require.NoError(t, err)
require.Equal(t, s.cids.Sealed, sealCid)
require.Equal(t, s.cids.Unsealed, unsealedCid)
}
func TestGenerateSDR(t *testing.T) {
d := t.TempDir()

View File

@ -648,7 +648,7 @@ func TestRestartWorker(t *testing.T) {
wds := syncds.MutexWrap(datastore.NewMapDatastore())
arch := make(chan chan apres)
w := newLocalWorker(func() (storiface.Storage, error) {
w := NewLocalWorkerWithExecutor(func(_ *LocalWorker) (storiface.Storage, error) {
return &testExec{apch: arch}, nil
}, WorkerConfig{
TaskTypes: localTasks,
@ -685,7 +685,7 @@ func TestRestartWorker(t *testing.T) {
}
// restart the worker
w = newLocalWorker(func() (storiface.Storage, error) {
w = NewLocalWorkerWithExecutor(func(_ *LocalWorker) (storiface.Storage, error) {
return &testExec{apch: arch}, nil
}, WorkerConfig{
TaskTypes: localTasks,
@ -721,7 +721,7 @@ func TestReenableWorker(t *testing.T) {
wds := datastore.NewMapDatastore()
arch := make(chan chan apres)
w := newLocalWorker(func() (storiface.Storage, error) {
w := NewLocalWorkerWithExecutor(func(_ *LocalWorker) (storiface.Storage, error) {
return &testExec{apch: arch}, nil
}, WorkerConfig{
TaskTypes: localTasks,
@ -794,7 +794,7 @@ func TestResUse(t *testing.T) {
wds := syncds.MutexWrap(datastore.NewMapDatastore())
arch := make(chan chan apres)
w := newLocalWorker(func() (storiface.Storage, error) {
w := NewLocalWorkerWithExecutor(func(_ *LocalWorker) (storiface.Storage, error) {
return &testExec{apch: arch}, nil
}, WorkerConfig{
TaskTypes: localTasks,
@ -852,7 +852,7 @@ func TestResOverride(t *testing.T) {
wds := syncds.MutexWrap(datastore.NewMapDatastore())
arch := make(chan chan apres)
w := newLocalWorker(func() (storiface.Storage, error) {
w := NewLocalWorkerWithExecutor(func(_ *LocalWorker) (storiface.Storage, error) {
return &testExec{apch: arch}, nil
}, WorkerConfig{
TaskTypes: localTasks,

View File

@ -286,7 +286,7 @@ func (p *pieceProviderTestHarness) addRemoteWorker(t *testing.T, tasks []sealtas
dstore := ds_sync.MutexWrap(datastore.NewMapDatastore())
csts := statestore.New(namespace.Wrap(dstore, datastore.NewKey("/stmgr/calls")))
worker := newLocalWorker(nil, WorkerConfig{
worker := NewLocalWorkerWithExecutor(nil, WorkerConfig{
TaskTypes: tasks,
}, os.LookupEnv, remote, localStore, p.index, p.mgr, csts)

View File

@ -47,7 +47,7 @@ type WorkerConfig struct {
}
// used do provide custom proofs impl (mostly used in testing)
type ExecutorFunc func() (storiface.Storage, error)
type ExecutorFunc func(w *LocalWorker) (storiface.Storage, error)
type EnvFunc func(string) (string, bool)
type LocalWorker struct {
@ -77,7 +77,7 @@ type LocalWorker struct {
closing chan struct{}
}
func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, envLookup EnvFunc, store paths.Store, local *paths.Local, sindex paths.SectorIndex, ret storiface.WorkerReturn, cst *statestore.StateStore) *LocalWorker {
func NewLocalWorkerWithExecutor(executor ExecutorFunc, wcfg WorkerConfig, envLookup EnvFunc, store paths.Store, local *paths.Local, sindex paths.SectorIndex, ret storiface.WorkerReturn, cst *statestore.StateStore) *LocalWorker {
acceptTasks := map[sealtasks.TaskType]struct{}{}
for _, taskType := range wcfg.TaskTypes {
acceptTasks[taskType] = struct{}{}
@ -116,7 +116,7 @@ func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, envLookup EnvFunc,
}
if w.executor == nil {
w.executor = w.ffiExec
w.executor = FFIExec()
}
unfinished, err := w.ct.unfinished()
@ -143,7 +143,7 @@ func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, envLookup EnvFunc,
}
func NewLocalWorker(wcfg WorkerConfig, store paths.Store, local *paths.Local, sindex paths.SectorIndex, ret storiface.WorkerReturn, cst *statestore.StateStore) *LocalWorker {
return newLocalWorker(nil, wcfg, os.LookupEnv, store, local, sindex, ret, cst)
return NewLocalWorkerWithExecutor(nil, wcfg, os.LookupEnv, store, local, sindex, ret, cst)
}
type localWorkerPathProvider struct {
@ -180,12 +180,14 @@ func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector stor
}, nil
}
func (l *localWorkerPathProvider) AcquireSectorCopy(ctx context.Context, id storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error) {
return (&localWorkerPathProvider{w: l.w, op: storiface.AcquireCopy}).AcquireSector(ctx, id, existing, allocate, ptype)
func FFIExec(opts ...ffiwrapper.FFIWrapperOpt) func(l *LocalWorker) (storiface.Storage, error) {
return func(l *LocalWorker) (storiface.Storage, error) {
return ffiwrapper.New(&localWorkerPathProvider{w: l}, opts...)
}
}
func (l *LocalWorker) ffiExec() (storiface.Storage, error) {
return ffiwrapper.New(&localWorkerPathProvider{w: l})
func (l *localWorkerPathProvider) AcquireSectorCopy(ctx context.Context, id storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error) {
return (&localWorkerPathProvider{w: l.w, op: storiface.AcquireCopy}).AcquireSector(ctx, id, existing, allocate, ptype)
}
type ReturnType string
@ -343,7 +345,7 @@ func doReturn(ctx context.Context, rt ReturnType, ci storiface.CallID, ret stori
}
func (l *LocalWorker) NewSector(ctx context.Context, sector storiface.SectorRef) error {
sb, err := l.executor()
sb, err := l.executor(l)
if err != nil {
return err
}
@ -352,7 +354,7 @@ func (l *LocalWorker) NewSector(ctx context.Context, sector storiface.SectorRef)
}
func (l *LocalWorker) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storiface.Data) (storiface.CallID, error) {
sb, err := l.executor()
sb, err := l.executor(l)
if err != nil {
return storiface.UndefCall, err
}
@ -363,7 +365,7 @@ func (l *LocalWorker) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSi
}
func (l *LocalWorker) AddPiece(ctx context.Context, sector storiface.SectorRef, epcs []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (storiface.CallID, error) {
sb, err := l.executor()
sb, err := l.executor(l)
if err != nil {
return storiface.UndefCall, err
}
@ -398,7 +400,7 @@ func (l *LocalWorker) SealPreCommit1(ctx context.Context, sector storiface.Secto
}
}
sb, err := l.executor()
sb, err := l.executor(l)
if err != nil {
return nil, err
}
@ -408,7 +410,7 @@ func (l *LocalWorker) SealPreCommit1(ctx context.Context, sector storiface.Secto
}
func (l *LocalWorker) SealPreCommit2(ctx context.Context, sector storiface.SectorRef, phase1Out storiface.PreCommit1Out) (storiface.CallID, error) {
sb, err := l.executor()
sb, err := l.executor(l)
if err != nil {
return storiface.UndefCall, err
}
@ -419,7 +421,7 @@ func (l *LocalWorker) SealPreCommit2(ctx context.Context, sector storiface.Secto
}
func (l *LocalWorker) SealCommit1(ctx context.Context, sector storiface.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storiface.SectorCids) (storiface.CallID, error) {
sb, err := l.executor()
sb, err := l.executor(l)
if err != nil {
return storiface.UndefCall, err
}
@ -430,7 +432,7 @@ func (l *LocalWorker) SealCommit1(ctx context.Context, sector storiface.SectorRe
}
func (l *LocalWorker) SealCommit2(ctx context.Context, sector storiface.SectorRef, phase1Out storiface.Commit1Out) (storiface.CallID, error) {
sb, err := l.executor()
sb, err := l.executor(l)
if err != nil {
return storiface.UndefCall, err
}
@ -441,7 +443,7 @@ func (l *LocalWorker) SealCommit2(ctx context.Context, sector storiface.SectorRe
}
func (l *LocalWorker) ReplicaUpdate(ctx context.Context, sector storiface.SectorRef, pieces []abi.PieceInfo) (storiface.CallID, error) {
sb, err := l.executor()
sb, err := l.executor(l)
if err != nil {
return storiface.UndefCall, err
}
@ -453,7 +455,7 @@ func (l *LocalWorker) ReplicaUpdate(ctx context.Context, sector storiface.Sector
}
func (l *LocalWorker) ProveReplicaUpdate1(ctx context.Context, sector storiface.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (storiface.CallID, error) {
sb, err := l.executor()
sb, err := l.executor(l)
if err != nil {
return storiface.UndefCall, err
}
@ -464,7 +466,7 @@ func (l *LocalWorker) ProveReplicaUpdate1(ctx context.Context, sector storiface.
}
func (l *LocalWorker) ProveReplicaUpdate2(ctx context.Context, sector storiface.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid, vanillaProofs storiface.ReplicaVanillaProofs) (storiface.CallID, error) {
sb, err := l.executor()
sb, err := l.executor(l)
if err != nil {
return storiface.UndefCall, err
}
@ -475,7 +477,7 @@ func (l *LocalWorker) ProveReplicaUpdate2(ctx context.Context, sector storiface.
}
func (l *LocalWorker) GenerateSectorKeyFromData(ctx context.Context, sector storiface.SectorRef, commD cid.Cid) (storiface.CallID, error) {
sb, err := l.executor()
sb, err := l.executor(l)
if err != nil {
return storiface.UndefCall, err
}
@ -486,7 +488,7 @@ func (l *LocalWorker) GenerateSectorKeyFromData(ctx context.Context, sector stor
}
func (l *LocalWorker) FinalizeSector(ctx context.Context, sector storiface.SectorRef) (storiface.CallID, error) {
sb, err := l.executor()
sb, err := l.executor(l)
if err != nil {
return storiface.UndefCall, err
}
@ -497,7 +499,7 @@ func (l *LocalWorker) FinalizeSector(ctx context.Context, sector storiface.Secto
}
func (l *LocalWorker) FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef) (storiface.CallID, error) {
sb, err := l.executor()
sb, err := l.executor(l)
if err != nil {
return storiface.UndefCall, err
}
@ -508,7 +510,7 @@ func (l *LocalWorker) FinalizeReplicaUpdate(ctx context.Context, sector storifac
}
func (l *LocalWorker) ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) (storiface.CallID, error) {
sb, err := l.executor()
sb, err := l.executor(l)
if err != nil {
return storiface.UndefCall, err
}
@ -564,7 +566,7 @@ func (l *LocalWorker) MoveStorage(ctx context.Context, sector storiface.SectorRe
}
func (l *LocalWorker) UnsealPiece(ctx context.Context, sector storiface.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) {
sb, err := l.executor()
sb, err := l.executor(l)
if err != nil {
return storiface.UndefCall, err
}
@ -591,7 +593,7 @@ func (l *LocalWorker) UnsealPiece(ctx context.Context, sector storiface.SectorRe
}
func (l *LocalWorker) DownloadSectorData(ctx context.Context, sector storiface.SectorRef, finalized bool, src map[storiface.SectorFileType]storiface.SectorLocation) (storiface.CallID, error) {
sb, err := l.executor()
sb, err := l.executor(l)
if err != nil {
return storiface.UndefCall, err
}
@ -602,7 +604,7 @@ func (l *LocalWorker) DownloadSectorData(ctx context.Context, sector storiface.S
}
func (l *LocalWorker) GenerateWinningPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []storiface.PostSectorChallenge, randomness abi.PoStRandomness) ([]proof.PoStProof, error) {
sb, err := l.executor()
sb, err := l.executor(l)
if err != nil {
return nil, err
}
@ -647,7 +649,7 @@ func (l *LocalWorker) GenerateWinningPoSt(ctx context.Context, ppt abi.Registere
}
func (l *LocalWorker) GenerateWindowPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []storiface.PostSectorChallenge, partitionIdx int, randomness abi.PoStRandomness) (storiface.WindowPoStResult, error) {
sb, err := l.executor()
sb, err := l.executor(l)
if err != nil {
return storiface.WindowPoStResult{}, err
}