WIP: Integrate FIP0013

This commit is contained in:
Łukasz Magiera 2021-03-10 16:16:44 +01:00 committed by Aayush Rajasekaran
parent b19d6cde11
commit 506f39b294
40 changed files with 783 additions and 133 deletions

View File

@ -80,6 +80,8 @@ type StorageMiner interface {
// SectorTerminatePending returns a list of pending sector terminations to be sent in the next batch message
SectorTerminatePending(ctx context.Context) ([]abi.SectorID, error) //perm:admin
SectorMarkForUpgrade(ctx context.Context, id abi.SectorNumber) error //perm:admin
SectorCommitFlush(ctx context.Context) (*cid.Cid, error) //perm:admin
SectorCommitPending(ctx context.Context) ([]abi.SectorID, error) //perm:admin
// WorkerConnect tells the node to connect to workers RPC
WorkerConnect(context.Context, string) error //perm:admin retry:true

View File

@ -639,6 +639,10 @@ type StorageMinerStruct struct {
SealingSchedDiag func(p0 context.Context, p1 bool) (interface{}, error) `perm:"admin"`
SectorCommitFlush func(p0 context.Context) (*cid.Cid, error) `perm:"admin"`
SectorCommitPending func(p0 context.Context) ([]abi.SectorID, error) `perm:"admin"`
SectorGetExpectedSealDuration func(p0 context.Context) (time.Duration, error) `perm:"read"`
SectorGetSealDelay func(p0 context.Context) (time.Duration, error) `perm:"read"`
@ -1923,6 +1927,14 @@ func (s *StorageMinerStruct) SealingSchedDiag(p0 context.Context, p1 bool) (inte
return s.Internal.SealingSchedDiag(p0, p1)
}
func (s *StorageMinerStruct) SectorCommitFlush(p0 context.Context) (*cid.Cid, error) {
return s.Internal.SectorCommitFlush(p0)
}
func (s *StorageMinerStruct) SectorCommitPending(p0 context.Context) ([]abi.SectorID, error) {
return s.Internal.SectorCommitPending(p0)
}
func (s *StorageMinerStruct) SectorGetExpectedSealDuration(p0 context.Context) (time.Duration, error) {
return s.Internal.SectorGetExpectedSealDuration(p0)
}

View File

@ -5,3 +5,7 @@ import rice "github.com/GeertJohan/go.rice"
func ParametersJSON() []byte {
return rice.MustFindBox("proof-params").MustBytes("parameters.json")
}
func SrsJSON() []byte {
return rice.MustFindBox("proof-params").MustBytes("srs-inner-product.json")
}

View File

@ -0,0 +1,7 @@
{
"v28-fil-inner-product-v1.srs": {
"cid": "Qmdq44DjcQnFfU3PJcdX7J49GCqcUYszr1TxMbHtAkvQ3g",
"digest": "ae20310138f5ba81451d723f858e3797",
"sector_size": 0
}
}

View File

@ -25,7 +25,7 @@ import (
"go.opencensus.io/trace"
"golang.org/x/xerrors"
proof2 "github.com/filecoin-project/specs-actors/v2/actors/runtime/proof"
proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/blockstore"
@ -51,7 +51,7 @@ const msgsPerBlock = 20
//nolint:deadcode,varcheck
var log = logging.Logger("gen")
var ValidWpostForTesting = []proof2.PoStProof{{
var ValidWpostForTesting = []proof5.PoStProof{{
ProofBytes: []byte("valid proof"),
}}
@ -460,7 +460,7 @@ func (cg *ChainGen) NextTipSetFromMinersWithMessages(base *types.TipSet, miners
func (cg *ChainGen) makeBlock(parents *types.TipSet, m address.Address, vrfticket *types.Ticket,
eticket *types.ElectionProof, bvals []types.BeaconEntry, height abi.ChainEpoch,
wpost []proof2.PoStProof, msgs []*types.SignedMessage) (*types.FullBlock, error) {
wpost []proof5.PoStProof, msgs []*types.SignedMessage) (*types.FullBlock, error) {
var ts uint64
if cg.Timestamper != nil {
@ -598,7 +598,7 @@ func (mca mca) WalletSign(ctx context.Context, a address.Address, v []byte) (*cr
type WinningPoStProver interface {
GenerateCandidates(context.Context, abi.PoStRandomness, uint64) ([]uint64, error)
ComputeProof(context.Context, []proof2.SectorInfo, abi.PoStRandomness) ([]proof2.PoStProof, error)
ComputeProof(context.Context, []proof5.SectorInfo, abi.PoStRandomness) ([]proof5.PoStProof, error)
}
type wppProvider struct{}
@ -607,7 +607,7 @@ func (wpp *wppProvider) GenerateCandidates(ctx context.Context, _ abi.PoStRandom
return []uint64{0}, nil
}
func (wpp *wppProvider) ComputeProof(context.Context, []proof2.SectorInfo, abi.PoStRandomness) ([]proof2.PoStProof, error) {
func (wpp *wppProvider) ComputeProof(context.Context, []proof5.SectorInfo, abi.PoStRandomness) ([]proof5.PoStProof, error) {
return ValidWpostForTesting, nil
}
@ -685,15 +685,23 @@ type genFakeVerifier struct{}
var _ ffiwrapper.Verifier = (*genFakeVerifier)(nil)
func (m genFakeVerifier) VerifySeal(svi proof2.SealVerifyInfo) (bool, error) {
func (m genFakeVerifier) VerifySeal(svi proof5.SealVerifyInfo) (bool, error) {
return true, nil
}
func (m genFakeVerifier) VerifyWinningPoSt(ctx context.Context, info proof2.WinningPoStVerifyInfo) (bool, error) {
func (m genFakeVerifier) VerifyAggregateSeals(aggregate proof5.AggregateSealVerifyProofAndInfos) (bool, error) {
panic("not supported")
}
func (m genFakeVerifier) VerifyWindowPoSt(ctx context.Context, info proof2.WindowPoStVerifyInfo) (bool, error) {
func (m genFakeVerifier) AggregateSealProofs(proofType abi.RegisteredSealProof, proofs [][]byte) ([]byte, error) {
panic("not supported")
}
func (m genFakeVerifier) VerifyWinningPoSt(ctx context.Context, info proof5.WinningPoStVerifyInfo) (bool, error) {
panic("not supported")
}
func (m genFakeVerifier) VerifyWindowPoSt(ctx context.Context, info proof5.WindowPoStVerifyInfo) (bool, error) {
panic("not supported")
}

View File

@ -27,7 +27,7 @@ import (
miner0 "github.com/filecoin-project/specs-actors/actors/builtin/miner"
power0 "github.com/filecoin-project/specs-actors/actors/builtin/power"
reward0 "github.com/filecoin-project/specs-actors/actors/builtin/reward"
runtime2 "github.com/filecoin-project/specs-actors/v2/actors/runtime"
runtime5 "github.com/filecoin-project/specs-actors/v5/actors/runtime"
"github.com/filecoin-project/lotus/chain/state"
"github.com/filecoin-project/lotus/chain/store"
@ -46,7 +46,7 @@ func MinerAddress(genesisIndex uint64) address.Address {
}
type fakedSigSyscalls struct {
runtime2.Syscalls
runtime5.Syscalls
}
func (fss *fakedSigSyscalls) VerifySignature(signature crypto.Signature, signer address.Address, plaintext []byte) error {
@ -54,7 +54,7 @@ func (fss *fakedSigSyscalls) VerifySignature(signature crypto.Signature, signer
}
func mkFakedSigSyscalls(base vm.SyscallBuilder) vm.SyscallBuilder {
return func(ctx context.Context, rt *vm.Runtime) runtime2.Syscalls {
return func(ctx context.Context, rt *vm.Runtime) runtime5.Syscalls {
return &fakedSigSyscalls{
base(ctx, rt),
}

View File

@ -9,8 +9,8 @@ import (
addr "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/crypto"
vmr2 "github.com/filecoin-project/specs-actors/v2/actors/runtime"
proof2 "github.com/filecoin-project/specs-actors/v2/actors/runtime/proof"
vmr5 "github.com/filecoin-project/specs-actors/v5/actors/runtime"
proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"
"github.com/ipfs/go-cid"
)
@ -74,8 +74,9 @@ type Pricelist interface {
OnVerifySignature(sigType crypto.SigType, planTextSize int) (GasCharge, error)
OnHashing(dataSize int) GasCharge
OnComputeUnsealedSectorCid(proofType abi.RegisteredSealProof, pieces []abi.PieceInfo) GasCharge
OnVerifySeal(info proof2.SealVerifyInfo) GasCharge
OnVerifyPost(info proof2.WindowPoStVerifyInfo) GasCharge
OnVerifySeal(info proof5.SealVerifyInfo) GasCharge
OnVerifyAggregateSeals() GasCharge
OnVerifyPost(info proof5.WindowPoStVerifyInfo) GasCharge
OnVerifyConsensusFault() GasCharge
}
@ -111,6 +112,7 @@ var prices = map[abi.ChainEpoch]Pricelist{
hashingBase: 31355,
computeUnsealedSectorCidBase: 98647,
verifySealBase: 2000, // TODO gas , it VerifySeal syscall is not used
verifyAggregateSealBase: 0,
verifyPostLookup: map[abi.RegisteredPoStProof]scalingCost{
abi.RegisteredPoStProof_StackedDrgWindow512MiBV1: {
flat: 123861062,
@ -159,6 +161,7 @@ var prices = map[abi.ChainEpoch]Pricelist{
hashingBase: 31355,
computeUnsealedSectorCidBase: 98647,
verifySealBase: 2000, // TODO gas , it VerifySeal syscall is not used
verifyAggregateSealBase: 400_000_000, // TODO (~40ms, I think)
verifyPostLookup: map[abi.RegisteredPoStProof]scalingCost{
abi.RegisteredPoStProof_StackedDrgWindow512MiBV1: {
flat: 117680921,
@ -198,7 +201,7 @@ func PricelistByEpoch(epoch abi.ChainEpoch) Pricelist {
}
type pricedSyscalls struct {
under vmr2.Syscalls
under vmr5.Syscalls
pl Pricelist
chargeGas func(GasCharge)
}
@ -232,7 +235,7 @@ func (ps pricedSyscalls) ComputeUnsealedSectorCID(reg abi.RegisteredSealProof, p
}
// Verifies a sector seal proof.
func (ps pricedSyscalls) VerifySeal(vi proof2.SealVerifyInfo) error {
func (ps pricedSyscalls) VerifySeal(vi proof5.SealVerifyInfo) error {
ps.chargeGas(ps.pl.OnVerifySeal(vi))
defer ps.chargeGas(gasOnActorExec)
@ -240,7 +243,7 @@ func (ps pricedSyscalls) VerifySeal(vi proof2.SealVerifyInfo) error {
}
// Verifies a proof of spacetime.
func (ps pricedSyscalls) VerifyPoSt(vi proof2.WindowPoStVerifyInfo) error {
func (ps pricedSyscalls) VerifyPoSt(vi proof5.WindowPoStVerifyInfo) error {
ps.chargeGas(ps.pl.OnVerifyPost(vi))
defer ps.chargeGas(gasOnActorExec)
@ -257,14 +260,14 @@ func (ps pricedSyscalls) VerifyPoSt(vi proof2.WindowPoStVerifyInfo) error {
// the "parent grinding fault", in which case it must be the sibling of h1 (same parent tipset) and one of the
// blocks in the parent of h2 (i.e. h2's grandparent).
// Returns nil and an error if the headers don't prove a fault.
func (ps pricedSyscalls) VerifyConsensusFault(h1 []byte, h2 []byte, extra []byte) (*vmr2.ConsensusFault, error) {
func (ps pricedSyscalls) VerifyConsensusFault(h1 []byte, h2 []byte, extra []byte) (*vmr5.ConsensusFault, error) {
ps.chargeGas(ps.pl.OnVerifyConsensusFault())
defer ps.chargeGas(gasOnActorExec)
return ps.under.VerifyConsensusFault(h1, h2, extra)
}
func (ps pricedSyscalls) BatchVerifySeals(inp map[address.Address][]proof2.SealVerifyInfo) (map[address.Address][]bool, error) {
func (ps pricedSyscalls) BatchVerifySeals(inp map[address.Address][]proof5.SealVerifyInfo) (map[address.Address][]bool, error) {
count := int64(0)
for _, svis := range inp {
count += int64(len(svis))
@ -277,3 +280,10 @@ func (ps pricedSyscalls) BatchVerifySeals(inp map[address.Address][]proof2.SealV
return ps.under.BatchVerifySeals(inp)
}
func (ps pricedSyscalls) VerifyAggregateSeals(aggregate proof5.AggregateSealVerifyProofAndInfos) error {
ps.chargeGas(ps.pl.OnVerifyAggregateSeals())
defer ps.chargeGas(gasOnActorExec)
return ps.under.VerifyAggregateSeals(aggregate)
}

View File

@ -91,6 +91,7 @@ type pricelistV0 struct {
computeUnsealedSectorCidBase int64
verifySealBase int64
verifyAggregateSealBase int64
verifyPostLookup map[abi.RegisteredPoStProof]scalingCost
verifyPostDiscount bool
verifyConsensusFault int64
@ -185,6 +186,12 @@ func (pl *pricelistV0) OnVerifySeal(info proof2.SealVerifyInfo) GasCharge {
return newGasCharge("OnVerifySeal", pl.verifySealBase, 0)
}
// OnVerifyAggregateSeals
func (pl *pricelistV0) OnVerifyAggregateSeals() GasCharge {
// TODO: this needs more cost tunning
return newGasCharge("OnVerifyAggregateSeals", pl.verifyAggregateSealBase, 0)
}
// OnVerifyPost
func (pl *pricelistV0) OnVerifyPost(info proof2.WindowPoStVerifyInfo) GasCharge {
sectorSize := "unknown"

View File

@ -16,10 +16,10 @@ import (
exported0 "github.com/filecoin-project/specs-actors/actors/builtin/exported"
exported2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/exported"
vmr "github.com/filecoin-project/specs-actors/v2/actors/runtime"
exported3 "github.com/filecoin-project/specs-actors/v3/actors/builtin/exported"
exported4 "github.com/filecoin-project/specs-actors/v4/actors/builtin/exported"
exported5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/exported"
vmr "github.com/filecoin-project/specs-actors/v5/actors/runtime"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/exitcode"

View File

@ -16,7 +16,7 @@ import (
"github.com/filecoin-project/go-state-types/network"
rtt "github.com/filecoin-project/go-state-types/rt"
rt0 "github.com/filecoin-project/specs-actors/actors/runtime"
rt2 "github.com/filecoin-project/specs-actors/v2/actors/runtime"
rt5 "github.com/filecoin-project/specs-actors/v5/actors/runtime"
"github.com/ipfs/go-cid"
ipldcbor "github.com/ipfs/go-ipld-cbor"
"go.opencensus.io/trace"
@ -54,8 +54,8 @@ func (m *Message) ValueReceived() abi.TokenAmount {
var EnableGasTracing = false
type Runtime struct {
rt2.Message
rt2.Syscalls
rt5.Message
rt5.Syscalls
ctx context.Context
@ -136,7 +136,7 @@ func (rt *Runtime) StorePut(x cbor.Marshaler) cid.Cid {
}
var _ rt0.Runtime = (*Runtime)(nil)
var _ rt2.Runtime = (*Runtime)(nil)
var _ rt5.Runtime = (*Runtime)(nil)
func (rt *Runtime) shimCall(f func() interface{}) (rval []byte, aerr aerrors.ActorError) {
defer func() {

View File

@ -26,8 +26,8 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/lib/sigs"
runtime2 "github.com/filecoin-project/specs-actors/v2/actors/runtime"
proof2 "github.com/filecoin-project/specs-actors/v2/actors/runtime/proof"
runtime5 "github.com/filecoin-project/specs-actors/v5/actors/runtime"
proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"
)
func init() {
@ -36,10 +36,10 @@ func init() {
// Actual type is defined in chain/types/vmcontext.go because the VMContext interface is there
type SyscallBuilder func(ctx context.Context, rt *Runtime) runtime2.Syscalls
type SyscallBuilder func(ctx context.Context, rt *Runtime) runtime5.Syscalls
func Syscalls(verifier ffiwrapper.Verifier) SyscallBuilder {
return func(ctx context.Context, rt *Runtime) runtime2.Syscalls {
return func(ctx context.Context, rt *Runtime) runtime5.Syscalls {
return &syscallShim{
ctx: ctx,
@ -90,7 +90,7 @@ func (ss *syscallShim) HashBlake2b(data []byte) [32]byte {
// Checks validity of the submitted consensus fault with the two block headers needed to prove the fault
// and an optional extra one to check common ancestry (as needed).
// Note that the blocks are ordered: the method requires a.Epoch() <= b.Epoch().
func (ss *syscallShim) VerifyConsensusFault(a, b, extra []byte) (*runtime2.ConsensusFault, error) {
func (ss *syscallShim) VerifyConsensusFault(a, b, extra []byte) (*runtime5.ConsensusFault, error) {
// Note that block syntax is not validated. Any validly signed block will be accepted pursuant to the below conditions.
// Whether or not it could ever have been accepted in a chain is not checked/does not matter here.
// for that reason when checking block parent relationships, rather than instantiating a Tipset to do so
@ -133,14 +133,14 @@ func (ss *syscallShim) VerifyConsensusFault(a, b, extra []byte) (*runtime2.Conse
}
// (2) check for the consensus faults themselves
var consensusFault *runtime2.ConsensusFault
var consensusFault *runtime5.ConsensusFault
// (a) double-fork mining fault
if blockA.Height == blockB.Height {
consensusFault = &runtime2.ConsensusFault{
consensusFault = &runtime5.ConsensusFault{
Target: blockA.Miner,
Epoch: blockB.Height,
Type: runtime2.ConsensusFaultDoubleForkMining,
Type: runtime5.ConsensusFaultDoubleForkMining,
}
}
@ -148,10 +148,10 @@ func (ss *syscallShim) VerifyConsensusFault(a, b, extra []byte) (*runtime2.Conse
// strictly speaking no need to compare heights based on double fork mining check above,
// but at same height this would be a different fault.
if types.CidArrsEqual(blockA.Parents, blockB.Parents) && blockA.Height != blockB.Height {
consensusFault = &runtime2.ConsensusFault{
consensusFault = &runtime5.ConsensusFault{
Target: blockA.Miner,
Epoch: blockB.Height,
Type: runtime2.ConsensusFaultTimeOffsetMining,
Type: runtime5.ConsensusFaultTimeOffsetMining,
}
}
@ -171,10 +171,10 @@ func (ss *syscallShim) VerifyConsensusFault(a, b, extra []byte) (*runtime2.Conse
if types.CidArrsEqual(blockA.Parents, blockC.Parents) && blockA.Height == blockC.Height &&
types.CidArrsContains(blockB.Parents, blockC.Cid()) && !types.CidArrsContains(blockB.Parents, blockA.Cid()) {
consensusFault = &runtime2.ConsensusFault{
consensusFault = &runtime5.ConsensusFault{
Target: blockA.Miner,
Epoch: blockB.Height,
Type: runtime2.ConsensusFaultParentGrinding,
Type: runtime5.ConsensusFaultParentGrinding,
}
}
}
@ -243,7 +243,7 @@ func (ss *syscallShim) workerKeyAtLookback(height abi.ChainEpoch) (address.Addre
return ResolveToKeyAddr(ss.cstate, ss.cst, info.Worker)
}
func (ss *syscallShim) VerifyPoSt(proof proof2.WindowPoStVerifyInfo) error {
func (ss *syscallShim) VerifyPoSt(proof proof5.WindowPoStVerifyInfo) error {
ok, err := ss.verifier.VerifyWindowPoSt(context.TODO(), proof)
if err != nil {
return err
@ -254,7 +254,7 @@ func (ss *syscallShim) VerifyPoSt(proof proof2.WindowPoStVerifyInfo) error {
return nil
}
func (ss *syscallShim) VerifySeal(info proof2.SealVerifyInfo) error {
func (ss *syscallShim) VerifySeal(info proof5.SealVerifyInfo) error {
//_, span := trace.StartSpan(ctx, "ValidatePoRep")
//defer span.End()
@ -281,6 +281,18 @@ func (ss *syscallShim) VerifySeal(info proof2.SealVerifyInfo) error {
return nil
}
func (ss *syscallShim) VerifyAggregateSeals(aggregate proof5.AggregateSealVerifyProofAndInfos) error {
ok, err := ss.verifier.VerifyAggregateSeals(aggregate)
if err != nil {
return xerrors.Errorf("failed to verify aggregated PoRep: %w", err)
}
if !ok {
return fmt.Errorf("invalid aggredate proof")
}
return nil
}
func (ss *syscallShim) VerifySignature(sig crypto.Signature, addr address.Address, input []byte) error {
// TODO: in genesis setup, we are currently faking signatures
@ -294,7 +306,7 @@ func (ss *syscallShim) VerifySignature(sig crypto.Signature, addr address.Addres
var BatchSealVerifyParallelism = goruntime.NumCPU()
func (ss *syscallShim) BatchVerifySeals(inp map[address.Address][]proof2.SealVerifyInfo) (map[address.Address][]bool, error) {
func (ss *syscallShim) BatchVerifySeals(inp map[address.Address][]proof5.SealVerifyInfo) (map[address.Address][]bool, error) {
out := make(map[address.Address][]bool)
sema := make(chan struct{}, BatchSealVerifyParallelism)
@ -306,7 +318,7 @@ func (ss *syscallShim) BatchVerifySeals(inp map[address.Address][]proof2.SealVer
for i, s := range seals {
wg.Add(1)
go func(ma address.Address, ix int, svi proof2.SealVerifyInfo, res []bool) {
go func(ma address.Address, ix int, svi proof5.SealVerifyInfo, res []bool) {
defer wg.Done()
sema <- struct{}{}

View File

@ -23,7 +23,7 @@ var FetchParamCmd = &cli.Command{
}
sectorSize := uint64(sectorSizeInt)
err = paramfetch.GetParams(ReqContext(cctx), build.ParametersJSON(), sectorSize)
err = paramfetch.GetParams(ReqContext(cctx), build.ParametersJSON(), build.SrsJSON(), sectorSize)
if err != nil {
return xerrors.Errorf("fetching proof parameters: %w", err)
}

View File

@ -243,7 +243,7 @@ var sealBenchCmd = &cli.Command{
// Only fetch parameters if actually needed
skipc2 := c.Bool("skip-commit2")
if !skipc2 {
if err := paramfetch.GetParams(lcli.ReqContext(c), build.ParametersJSON(), uint64(sectorSize)); err != nil {
if err := paramfetch.GetParams(lcli.ReqContext(c), build.ParametersJSON(), build.SrsJSON(), uint64(sectorSize)); err != nil {
return xerrors.Errorf("getting params: %w", err)
}
}
@ -738,7 +738,7 @@ var proveCmd = &cli.Command{
return xerrors.Errorf("unmarshalling input file: %w", err)
}
if err := paramfetch.GetParams(lcli.ReqContext(c), build.ParametersJSON(), c2in.SectorSize); err != nil {
if err := paramfetch.GetParams(lcli.ReqContext(c), build.ParametersJSON(), build.SrsJSON(), c2in.SectorSize); err != nil {
return xerrors.Errorf("getting params: %w", err)
}

View File

@ -228,7 +228,7 @@ var runCmd = &cli.Command{
}
if cctx.Bool("commit") {
if err := paramfetch.GetParams(ctx, build.ParametersJSON(), uint64(ssize)); err != nil {
if err := paramfetch.GetParams(ctx, build.ParametersJSON(), build.SrsJSON(), uint64(ssize)); err != nil {
return xerrors.Errorf("get params: %w", err)
}
}

View File

@ -25,7 +25,7 @@ var fetchParamCmd = &cli.Command{
return err
}
sectorSize := uint64(sectorSizeInt)
err = paramfetch.GetParams(lcli.ReqContext(cctx), build.ParametersJSON(), sectorSize)
err = paramfetch.GetParams(lcli.ReqContext(cctx), build.ParametersJSON(), build.SrsJSON(), sectorSize)
if err != nil {
return xerrors.Errorf("fetching proof parameters: %w", err)
}

View File

@ -295,6 +295,8 @@ var stateList = []stateMeta{
{col: color.FgYellow, state: sealing.Committing},
{col: color.FgYellow, state: sealing.SubmitCommit},
{col: color.FgYellow, state: sealing.CommitWait},
{col: color.FgYellow, state: sealing.SubmitCommitAggregate},
{col: color.FgYellow, state: sealing.CommitAggregateWait},
{col: color.FgYellow, state: sealing.FinalizeSector},
{col: color.FgCyan, state: sealing.Terminating},

View File

@ -143,7 +143,7 @@ var initCmd = &cli.Command{
log.Info("Checking proof parameters")
if err := paramfetch.GetParams(ctx, build.ParametersJSON(), uint64(ssize)); err != nil {
if err := paramfetch.GetParams(ctx, build.ParametersJSON(), build.SrsJSON(), uint64(ssize)); err != nil {
return xerrors.Errorf("fetching proof parameters: %w", err)
}

View File

@ -249,7 +249,7 @@ var initRestoreCmd = &cli.Command{
log.Info("Checking proof parameters")
if err := paramfetch.GetParams(ctx, build.ParametersJSON(), uint64(mi.SectorSize)); err != nil {
if err := paramfetch.GetParams(ctx, build.ParametersJSON(), build.SrsJSON(), uint64(mi.SectorSize)); err != nil {
return xerrors.Errorf("fetching proof parameters: %w", err)
}

View File

@ -45,6 +45,7 @@ var sectorsCmd = &cli.Command{
sectorsStartSealCmd,
sectorsSealDelayCmd,
sectorsCapacityCollateralCmd,
sectorsPendingCommit,
},
}
@ -969,6 +970,53 @@ var sectorsUpdateCmd = &cli.Command{
},
}
var sectorsPendingCommit = &cli.Command{
Name: "pending-commit",
Usage: "list sectors waiting in batch queue",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "publish-now",
Usage: "send a batch now",
},
},
Action: func(cctx *cli.Context) error {
api, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
if cctx.Bool("publish-now") {
cid, err := api.SectorCommitFlush(ctx)
if err != nil {
return xerrors.Errorf("flush: %w", err)
}
if cid == nil {
return xerrors.Errorf("no sectors to publish")
}
fmt.Println("sector batch published: ", cid)
return nil
}
pending, err := api.SectorCommitPending(ctx)
if err != nil {
return xerrors.Errorf("getting pending deals: %w", err)
}
if len(pending) > 0 {
for _, sector := range pending {
fmt.Println(sector.Number)
}
return nil
}
fmt.Println("No sectors queued to be committed")
return nil
},
}
func yesno(b bool) string {
if b {
return color.GreenString("YES")

View File

@ -231,7 +231,7 @@ var DaemonCmd = &cli.Command{
freshRepo := err != repo.ErrRepoExists
if !isLite {
if err := paramfetch.GetParams(lcli.ReqContext(cctx), build.ParametersJSON(), 0); err != nil {
if err := paramfetch.GetParams(lcli.ReqContext(cctx), build.ParametersJSON(), build.SrsJSON(), 0); err != nil {
return xerrors.Errorf("fetching proof parameters: %w", err)
}
}

View File

@ -98,6 +98,8 @@
* [SealingAbort](#SealingAbort)
* [SealingSchedDiag](#SealingSchedDiag)
* [Sector](#Sector)
* [SectorCommitFlush](#SectorCommitFlush)
* [SectorCommitPending](#SectorCommitPending)
* [SectorGetExpectedSealDuration](#SectorGetExpectedSealDuration)
* [SectorGetSealDelay](#SectorGetSealDelay)
* [SectorMarkForUpgrade](#SectorMarkForUpgrade)
@ -1556,6 +1558,24 @@ Response: `{}`
## Sector
### SectorCommitFlush
Perms: admin
Inputs: `null`
Response: `null`
### SectorCommitPending
Perms: admin
Inputs: `null`
Response: `null`
### SectorGetExpectedSealDuration
SectorGetExpectedSealDuration gets the expected time for a sector to seal

2
extern/filecoin-ffi vendored

@ -1 +1 @@
Subproject commit dc4e4e8dc9554dedb6f48304f7f0c6328331f9ec
Subproject commit 5f9f082b03a22bbf0f31adcb2bdf7f539cee6b6b

View File

@ -18,6 +18,7 @@ import (
commpffi "github.com/filecoin-project/go-commp-utils/ffiwrapper"
proof2 "github.com/filecoin-project/specs-actors/v2/actors/runtime/proof"
proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"
"github.com/ipfs/go-cid"
@ -83,9 +84,10 @@ func (s *seal) precommit(t *testing.T, sb *Sealer, id storage.SectorRef, done fu
s.cids = cids
}
func (s *seal) commit(t *testing.T, sb *Sealer, done func()) {
var seed = abi.InteractiveSealRandomness{0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9, 8, 7, 6, 45, 3, 2, 1, 0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9}
func (s *seal) commit(t *testing.T, sb *Sealer, done func()) storage.Proof {
defer done()
seed := abi.InteractiveSealRandomness{0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9, 8, 7, 6, 45, 3, 2, 1, 0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9}
pc1, err := sb.SealCommit1(context.TODO(), s.ref, s.ticket, seed, []abi.PieceInfo{s.pi}, s.cids)
if err != nil {
@ -112,6 +114,8 @@ func (s *seal) commit(t *testing.T, sb *Sealer, done func()) {
if !ok {
t.Fatal("proof failed to validate")
}
return proof
}
func (s *seal) unseal(t *testing.T, sb *Sealer, sp *basicfs.Provider, si storage.SectorRef, done func()) {
@ -229,7 +233,12 @@ func getGrothParamFileAndVerifyingKeys(s abi.SectorSize) {
panic(err)
}
err = paramfetch.GetParams(context.TODO(), dat, uint64(s))
datSrs, err := ioutil.ReadFile("../../../build/proof-params/srs-inner-product.json")
if err != nil {
panic(err)
}
err = paramfetch.GetParams(context.TODO(), dat, datSrs, uint64(s))
if err != nil {
panic(xerrors.Errorf("failed to acquire Groth parameters for 2KiB sectors: %w", err))
}
@ -462,6 +471,94 @@ func TestSealAndVerify3(t *testing.T) {
post(t, sb, []abi.SectorID{si1.ID, si2.ID}, s1, s2, s3)
}
func TestSealAndVerifyAggregate(t *testing.T) {
numAgg := 5
if testing.Short() {
t.Skip("skipping test in short mode")
}
defer requireFDsClosed(t, openFDs(t))
if runtime.NumCPU() < 10 && os.Getenv("CI") == "" { // don't bother on slow hardware
t.Skip("this is slow")
}
_ = os.Setenv("RUST_LOG", "info")
getGrothParamFileAndVerifyingKeys(sectorSize)
cdir, err := ioutil.TempDir("", "sbtest-c-")
if err != nil {
t.Fatal(err)
}
miner := abi.ActorID(123)
sp := &basicfs.Provider{
Root: cdir,
}
sb, err := New(sp)
if err != nil {
t.Fatalf("%+v", err)
}
cleanup := func() {
if t.Failed() {
fmt.Printf("not removing %s\n", cdir)
return
}
if err := os.RemoveAll(cdir); err != nil {
t.Error(err)
}
}
defer cleanup()
avi := proof5.AggregateSealVerifyProofAndInfos{
Miner: miner,
Infos: make([]proof5.AggregateSealVerifyInfo, numAgg),
}
toAggregate := make([][]byte, numAgg)
for i := 0; i < numAgg; i++ {
si := storage.SectorRef{
ID: abi.SectorID{Miner: miner, Number: abi.SectorNumber(i + 1)},
ProofType: sealProofType,
}
s := seal{ref: si}
s.precommit(t, sb, si, func() {})
toAggregate[i] = s.commit(t, sb, func() {})
avi.Infos[i] = proof5.AggregateSealVerifyInfo{
Number: abi.SectorNumber(i + 1),
Randomness: s.ticket,
InteractiveRandomness: seed,
SealedCID: s.cids.Sealed,
UnsealedCID: s.cids.Unsealed,
}
}
aggStart := time.Now()
avi.Proof, err = ProofVerifier.AggregateSealProofs(sealProofType, toAggregate)
require.NoError(t, err)
aggDone := time.Now()
_, err = ProofVerifier.AggregateSealProofs(sealProofType, toAggregate)
require.NoError(t, err)
aggHot := time.Now()
ok, err := ProofVerifier.VerifyAggregateSeals(avi)
require.NoError(t, err)
require.True(t, ok)
verifDone := time.Now()
fmt.Printf("Aggregate: %s\n", aggDone.Sub(aggStart).String())
fmt.Printf("Hot: %s\n", aggHot.Sub(aggDone).String())
fmt.Printf("Verify: %s\n", verifDone.Sub(aggHot).String())
}
func BenchmarkWriteWithAlignment(b *testing.B) {
bt := abi.UnpaddedPieceSize(2 * 127 * 1024 * 1024)
b.SetBytes(int64(bt))

View File

@ -4,7 +4,7 @@ import (
"context"
"io"
proof2 "github.com/filecoin-project/specs-actors/v2/actors/runtime/proof"
proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"
"github.com/ipfs/go-cid"
@ -34,11 +34,15 @@ type Storage interface {
}
type Verifier interface {
VerifySeal(proof2.SealVerifyInfo) (bool, error)
VerifyWinningPoSt(ctx context.Context, info proof2.WinningPoStVerifyInfo) (bool, error)
VerifyWindowPoSt(ctx context.Context, info proof2.WindowPoStVerifyInfo) (bool, error)
VerifySeal(proof5.SealVerifyInfo) (bool, error)
VerifyAggregateSeals(aggregate proof5.AggregateSealVerifyProofAndInfos) (bool, error)
VerifyWinningPoSt(ctx context.Context, info proof5.WinningPoStVerifyInfo) (bool, error)
VerifyWindowPoSt(ctx context.Context, info proof5.WindowPoStVerifyInfo) (bool, error)
GenerateWinningPoStSectorChallenge(context.Context, abi.RegisteredPoStProof, abi.ActorID, abi.PoStRandomness, uint64) ([]uint64, error)
// cheap, makes no sense to put this on the storage interface
AggregateSealProofs(proofType abi.RegisteredSealProof, proofs [][]byte) ([]byte, error)
}
type SectorProvider interface {

View File

@ -10,13 +10,13 @@ import (
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/go-state-types/abi"
proof2 "github.com/filecoin-project/specs-actors/v2/actors/runtime/proof"
proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
func (sb *Sealer) GenerateWinningPoSt(ctx context.Context, minerID abi.ActorID, sectorInfo []proof2.SectorInfo, randomness abi.PoStRandomness) ([]proof2.PoStProof, error) {
func (sb *Sealer) GenerateWinningPoSt(ctx context.Context, minerID abi.ActorID, sectorInfo []proof5.SectorInfo, randomness abi.PoStRandomness) ([]proof5.PoStProof, error) {
randomness[31] &= 0x3f
privsectors, skipped, done, err := sb.pubSectorToPriv(ctx, minerID, sectorInfo, nil, abi.RegisteredSealProof.RegisteredWinningPoStProof) // TODO: FAULTS?
if err != nil {
@ -30,7 +30,7 @@ func (sb *Sealer) GenerateWinningPoSt(ctx context.Context, minerID abi.ActorID,
return ffi.GenerateWinningPoSt(minerID, privsectors, randomness)
}
func (sb *Sealer) GenerateWindowPoSt(ctx context.Context, minerID abi.ActorID, sectorInfo []proof2.SectorInfo, randomness abi.PoStRandomness) ([]proof2.PoStProof, []abi.SectorID, error) {
func (sb *Sealer) GenerateWindowPoSt(ctx context.Context, minerID abi.ActorID, sectorInfo []proof5.SectorInfo, randomness abi.PoStRandomness) ([]proof5.PoStProof, []abi.SectorID, error) {
randomness[31] &= 0x3f
privsectors, skipped, done, err := sb.pubSectorToPriv(ctx, minerID, sectorInfo, nil, abi.RegisteredSealProof.RegisteredWindowPoStProof)
if err != nil {
@ -55,7 +55,7 @@ func (sb *Sealer) GenerateWindowPoSt(ctx context.Context, minerID abi.ActorID, s
return proof, faultyIDs, err
}
func (sb *Sealer) pubSectorToPriv(ctx context.Context, mid abi.ActorID, sectorInfo []proof2.SectorInfo, faults []abi.SectorNumber, rpt func(abi.RegisteredSealProof) (abi.RegisteredPoStProof, error)) (ffi.SortedPrivateSectorInfo, []abi.SectorID, func(), error) {
func (sb *Sealer) pubSectorToPriv(ctx context.Context, mid abi.ActorID, sectorInfo []proof5.SectorInfo, faults []abi.SectorNumber, rpt func(abi.RegisteredSealProof) (abi.RegisteredPoStProof, error)) (ffi.SortedPrivateSectorInfo, []abi.SectorID, func(), error) {
fmap := map[abi.SectorNumber]struct{}{}
for _, fault := range faults {
fmap[fault] = struct{}{}
@ -111,11 +111,15 @@ type proofVerifier struct{}
var ProofVerifier = proofVerifier{}
func (proofVerifier) VerifySeal(info proof2.SealVerifyInfo) (bool, error) {
func (proofVerifier) VerifySeal(info proof5.SealVerifyInfo) (bool, error) {
return ffi.VerifySeal(info)
}
func (proofVerifier) VerifyWinningPoSt(ctx context.Context, info proof2.WinningPoStVerifyInfo) (bool, error) {
func (proofVerifier) VerifyAggregateSeals(aggregate proof5.AggregateSealVerifyProofAndInfos) (bool, error) {
return ffi.VerifyAggregateSeals(aggregate)
}
func (proofVerifier) VerifyWinningPoSt(ctx context.Context, info proof5.WinningPoStVerifyInfo) (bool, error) {
info.Randomness[31] &= 0x3f
_, span := trace.StartSpan(ctx, "VerifyWinningPoSt")
defer span.End()
@ -123,7 +127,7 @@ func (proofVerifier) VerifyWinningPoSt(ctx context.Context, info proof2.WinningP
return ffi.VerifyWinningPoSt(info)
}
func (proofVerifier) VerifyWindowPoSt(ctx context.Context, info proof2.WindowPoStVerifyInfo) (bool, error) {
func (proofVerifier) VerifyWindowPoSt(ctx context.Context, info proof5.WindowPoStVerifyInfo) (bool, error) {
info.Randomness[31] &= 0x3f
_, span := trace.StartSpan(ctx, "VerifyWindowPoSt")
defer span.End()
@ -135,3 +139,7 @@ func (proofVerifier) GenerateWinningPoStSectorChallenge(ctx context.Context, pro
randomness[31] &= 0x3f
return ffi.GenerateWinningPoStSectorChallenge(proofType, minerID, randomness, eligibleSectorCount)
}
func (v proofVerifier) AggregateSealProofs(proofType abi.RegisteredSealProof, proofs [][]byte) ([]byte, error) {
return ffi.AggregateSealProofs(proofType, proofs)
}

View File

@ -9,7 +9,7 @@ import (
"math/rand"
"sync"
proof2 "github.com/filecoin-project/specs-actors/v2/actors/runtime/proof"
proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"
ffiwrapper2 "github.com/filecoin-project/go-commp-utils/ffiwrapper"
commcid "github.com/filecoin-project/go-fil-commcid"
@ -300,14 +300,14 @@ func AddOpFinish(ctx context.Context) (context.Context, func()) {
}
}
func (mgr *SectorMgr) GenerateWinningPoSt(ctx context.Context, minerID abi.ActorID, sectorInfo []proof2.SectorInfo, randomness abi.PoStRandomness) ([]proof2.PoStProof, error) {
func (mgr *SectorMgr) GenerateWinningPoSt(ctx context.Context, minerID abi.ActorID, sectorInfo []proof5.SectorInfo, randomness abi.PoStRandomness) ([]proof5.PoStProof, error) {
mgr.lk.Lock()
defer mgr.lk.Unlock()
return generateFakePoSt(sectorInfo, abi.RegisteredSealProof.RegisteredWinningPoStProof, randomness), nil
}
func (mgr *SectorMgr) GenerateWindowPoSt(ctx context.Context, minerID abi.ActorID, sectorInfo []proof2.SectorInfo, randomness abi.PoStRandomness) ([]proof2.PoStProof, []abi.SectorID, error) {
func (mgr *SectorMgr) GenerateWindowPoSt(ctx context.Context, minerID abi.ActorID, sectorInfo []proof5.SectorInfo, randomness abi.PoStRandomness) ([]proof5.PoStProof, []abi.SectorID, error) {
mgr.lk.Lock()
defer mgr.lk.Unlock()
@ -315,7 +315,8 @@ func (mgr *SectorMgr) GenerateWindowPoSt(ctx context.Context, minerID abi.ActorI
return nil, nil, xerrors.Errorf("failed to post (mock)")
}
si := make([]proof2.SectorInfo, 0, len(sectorInfo))
si := make([]proof5.SectorInfo, 0, len(sectorInfo))
var skipped []abi.SectorID
var err error
@ -343,7 +344,7 @@ func (mgr *SectorMgr) GenerateWindowPoSt(ctx context.Context, minerID abi.ActorI
return generateFakePoSt(si, abi.RegisteredSealProof.RegisteredWindowPoStProof, randomness), skipped, nil
}
func generateFakePoStProof(sectorInfo []proof2.SectorInfo, randomness abi.PoStRandomness) []byte {
func generateFakePoStProof(sectorInfo []proof5.SectorInfo, randomness abi.PoStRandomness) []byte {
randomness[31] &= 0x3f
hasher := sha256.New()
@ -358,13 +359,13 @@ func generateFakePoStProof(sectorInfo []proof2.SectorInfo, randomness abi.PoStRa
}
func generateFakePoSt(sectorInfo []proof2.SectorInfo, rpt func(abi.RegisteredSealProof) (abi.RegisteredPoStProof, error), randomness abi.PoStRandomness) []proof2.PoStProof {
func generateFakePoSt(sectorInfo []proof5.SectorInfo, rpt func(abi.RegisteredSealProof) (abi.RegisteredPoStProof, error), randomness abi.PoStRandomness) []proof5.PoStProof {
wp, err := rpt(sectorInfo[0].SealProof)
if err != nil {
panic(err)
}
return []proof2.PoStProof{
return []proof5.PoStProof{
{
PoStProof: wp,
ProofBytes: generateFakePoStProof(sectorInfo, randomness),
@ -489,7 +490,7 @@ func (mgr *SectorMgr) ReturnFetch(ctx context.Context, callID storiface.CallID,
panic("not supported")
}
func (m mockVerif) VerifySeal(svi proof2.SealVerifyInfo) (bool, error) {
func (m mockVerif) VerifySeal(svi proof5.SealVerifyInfo) (bool, error) {
plen, err := svi.SealProof.ProofSize()
if err != nil {
return false, err
@ -501,6 +502,7 @@ func (m mockVerif) VerifySeal(svi proof2.SealVerifyInfo) (bool, error) {
// only the first 32 bytes, the rest are 0.
for i, b := range svi.Proof[:32] {
// unsealed+sealed-seed*ticket
if b != svi.UnsealedCID.Bytes()[i]+svi.SealedCID.Bytes()[31-i]-svi.InteractiveRandomness[i]*svi.Randomness[i] {
return false, nil
}
@ -509,12 +511,35 @@ func (m mockVerif) VerifySeal(svi proof2.SealVerifyInfo) (bool, error) {
return true, nil
}
func (m mockVerif) VerifyWinningPoSt(ctx context.Context, info proof2.WinningPoStVerifyInfo) (bool, error) {
func (m mockVerif) VerifyAggregateSeals(aggregate proof5.AggregateSealVerifyProofAndInfos) (bool, error) {
out := make([]byte, 200)
for pi, svi := range aggregate.Infos {
for i := 0; i < 32; i++ {
b := svi.UnsealedCID.Bytes()[i] + svi.SealedCID.Bytes()[31-i] - svi.InteractiveRandomness[i]*svi.Randomness[i] // raw proof byte
b *= uint8(pi) // with aggregate index
out[i] += b
}
}
return bytes.Equal(aggregate.Proof, out), nil
}
func (m mockVerif) AggregateSealProofs(proofType abi.RegisteredSealProof, proofs [][]byte) ([]byte, error) {
out := make([]byte, 200) // todo: figure out more real length
for pi, proof := range proofs {
for i := range proof[:32] {
out[i] += proof[i] * uint8(pi)
}
}
return out, nil
}
func (m mockVerif) VerifyWinningPoSt(ctx context.Context, info proof5.WinningPoStVerifyInfo) (bool, error) {
info.Randomness[31] &= 0x3f
return true, nil
}
func (m mockVerif) VerifyWindowPoSt(ctx context.Context, info proof2.WindowPoStVerifyInfo) (bool, error) {
func (m mockVerif) VerifyWindowPoSt(ctx context.Context, info proof5.WindowPoStVerifyInfo) (bool, error) {
if len(info.Proofs) != 1 {
return false, xerrors.Errorf("expected 1 proof entry")
}

271
extern/storage-sealing/commit_batch.go vendored Normal file
View File

@ -0,0 +1,271 @@
package sealing
import (
"bytes"
"context"
"sort"
"sync"
"time"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-bitfield"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
miner5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/miner"
proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
)
var (
// TODO: config!
CommitBatchWait = 5 * time.Minute
)
type CommitBatcherApi interface {
SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error)
StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error)
}
type AggregateInput struct {
// TODO: Something changed in actors, I think this now needs to be AggregateSealVerifyProofAndInfos
info proof5.AggregateSealVerifyInfo
proof []byte
}
type CommitBatcher struct {
api CommitBatcherApi
maddr address.Address
mctx context.Context
addrSel AddrSel
feeCfg FeeConfig
getConfig GetSealingConfigFunc
verif ffiwrapper.Verifier
todo map[abi.SectorNumber]AggregateInput
waiting map[abi.SectorNumber][]chan cid.Cid
notify, stop, stopped chan struct{}
force chan chan *cid.Cid
lk sync.Mutex
}
func NewCommitBatcher(mctx context.Context, maddr address.Address, api CommitBatcherApi, addrSel AddrSel, feeCfg FeeConfig, getConfig GetSealingConfigFunc, verif ffiwrapper.Verifier) *CommitBatcher {
b := &CommitBatcher{
api: api,
maddr: maddr,
mctx: mctx,
addrSel: addrSel,
feeCfg: feeCfg,
getConfig: getConfig,
verif: verif,
todo: map[abi.SectorNumber]AggregateInput{},
waiting: map[abi.SectorNumber][]chan cid.Cid{},
notify: make(chan struct{}, 1),
force: make(chan chan *cid.Cid),
stop: make(chan struct{}),
stopped: make(chan struct{}),
}
go b.run()
return b
}
func (b *CommitBatcher) run() {
var forceRes chan *cid.Cid
var lastMsg *cid.Cid
for {
if forceRes != nil {
forceRes <- lastMsg
forceRes = nil
}
lastMsg = nil
var sendAboveMax, sendAboveMin bool
select {
case <-b.stop:
close(b.stopped)
return
case <-b.notify:
sendAboveMax = true
case <-time.After(TerminateBatchWait):
sendAboveMin = true
case fr := <-b.force: // user triggered
forceRes = fr
}
var err error
lastMsg, err = b.processBatch(sendAboveMax, sendAboveMin)
if err != nil {
log.Warnw("TerminateBatcher processBatch error", "error", err)
}
}
}
func (b *CommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) {
b.lk.Lock()
defer b.lk.Unlock()
params := miner5.ProveCommitAggregateParams{
SectorNumbers: bitfield.New(),
}
total := len(b.todo)
if total == 0 {
return nil, nil // nothing to do
}
cfg, err := b.getConfig()
if err != nil {
return nil, xerrors.Errorf("getting config: %w", err)
}
if notif && total < cfg.MaxCommitBatch {
return nil, nil
}
if after && total < cfg.MinCommitBatch {
return nil, nil
}
spt := b.todo[0].info.SealProof
proofs := make([][]byte, total)
for id, p := range b.todo {
if p.info.SealProof != spt {
// todo: handle when we'll have proof upgrade
return nil, xerrors.Errorf("different seal proof types in commit batch: %w", err)
}
params.SectorNumbers.Set(uint64(id))
proofs[id] = p.proof
}
params.AggregateProof, err = b.verif.AggregateSealProofs(spt, proofs)
if err != nil {
return nil, xerrors.Errorf("aggregating proofs: %w", err)
}
enc := new(bytes.Buffer)
if err := params.MarshalCBOR(enc); err != nil {
return nil, xerrors.Errorf("couldn't serialize TerminateSectors params: %w", err)
}
mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, nil)
if err != nil {
return nil, xerrors.Errorf("couldn't get miner info: %w", err)
}
from, _, err := b.addrSel(b.mctx, mi, api.CommitAddr, b.feeCfg.MaxCommitGasFee, b.feeCfg.MaxCommitGasFee)
if err != nil {
return nil, xerrors.Errorf("no good address found: %w", err)
}
mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.ProveCommitAggregate, big.Zero(), b.feeCfg.MaxCommitGasFee, enc.Bytes())
if err != nil {
return nil, xerrors.Errorf("sending message failed: %w", err)
}
log.Infow("Sent ProveCommitAggregate message", "cid", mcid, "from", from, "sectors", total)
err = params.SectorNumbers.ForEach(func(us uint64) error {
sn := abi.SectorNumber(us)
for _, ch := range b.waiting[sn] {
ch <- mcid // buffered
}
delete(b.waiting, sn)
delete(b.todo, sn)
return nil
})
if err != nil {
return nil, xerrors.Errorf("done sectors foreach: %w", err)
}
return &mcid, nil
}
// register commit, wait for batch message, return message CID
func (b *CommitBatcher) AddCommit(ctx context.Context, s abi.SectorNumber, in AggregateInput) (mcid cid.Cid, err error) {
b.lk.Lock()
b.todo[s] = in
sent := make(chan cid.Cid, 1)
b.waiting[s] = append(b.waiting[s], sent)
select {
case b.notify <- struct{}{}:
default: // already have a pending notification, don't need more
}
b.lk.Unlock()
select {
case c := <-sent:
return c, nil
case <-ctx.Done():
return cid.Undef, ctx.Err()
}
}
func (b *CommitBatcher) Flush(ctx context.Context) (*cid.Cid, error) {
resCh := make(chan *cid.Cid, 1)
select {
case b.force <- resCh:
select {
case res := <-resCh:
return res, nil
case <-ctx.Done():
return nil, ctx.Err()
}
case <-ctx.Done():
return nil, ctx.Err()
}
}
func (b *CommitBatcher) Pending(ctx context.Context) ([]abi.SectorID, error) {
b.lk.Lock()
defer b.lk.Unlock()
mid, err := address.IDFromAddress(b.maddr)
if err != nil {
return nil, err
}
res := make([]abi.SectorID, 0)
for _, s := range b.todo {
res = append(res, abi.SectorID{
Miner: abi.ActorID(mid),
Number: s.info.Number,
})
}
sort.Slice(res, func(i, j int) bool {
if res[i].Miner != res[j].Miner {
return res[i].Miner < res[j].Miner
}
return res[i].Number < res[j].Number
})
return res, nil
}
func (b *CommitBatcher) Stop(ctx context.Context) error {
close(b.stop)
select {
case <-b.stopped:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

View File

@ -91,12 +91,22 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
SubmitCommit: planOne(
on(SectorCommitSubmitted{}, CommitWait),
on(SectorCommitFailed{}, CommitFailed),
on(SectorSubmitCommitAggregate{}, SubmitCommitAggregate),
),
SubmitCommitAggregate: planOne(
on(SectorCommitAggregateSent{}, CommitWait),
on(SectorCommitFailed{}, CommitFailed),
),
CommitWait: planOne(
on(SectorProving{}, FinalizeSector),
on(SectorCommitFailed{}, CommitFailed),
on(SectorRetrySubmitCommit{}, SubmitCommit),
),
CommitAggregateWait: planOne(
on(SectorProving{}, FinalizeSector),
on(SectorCommitFailed{}, CommitFailed),
on(SectorRetrySubmitCommit{}, SubmitCommit),
),
FinalizeSector: planOne(
on(SectorFinalized{}, Proving),
@ -338,8 +348,12 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
return m.handleCommitting, processed, nil
case SubmitCommit:
return m.handleSubmitCommit, processed, nil
case CommitAggregateWait:
fallthrough
case CommitWait:
return m.handleCommitWait, processed, nil
case SubmitCommitAggregate:
return m.handleSubmitCommitAggregate, processed, nil
case FinalizeSector:
return m.handleFinalizeSector, processed, nil

View File

@ -233,6 +233,10 @@ func (evt SectorCommitted) apply(state *SectorInfo) {
state.Proof = evt.Proof
}
type SectorSubmitCommitAggregate struct{}
func (evt SectorSubmitCommitAggregate) apply(*SectorInfo) {}
type SectorCommitSubmitted struct {
Message cid.Cid
}
@ -241,6 +245,14 @@ func (evt SectorCommitSubmitted) apply(state *SectorInfo) {
state.CommitMessage = &evt.Message
}
type SectorCommitAggregateSent struct {
Message cid.Cid
}
func (evt SectorCommitAggregateSent) apply(state *SectorInfo) {
state.CommitMessage = &evt.Message
}
type SectorProving struct{}
func (evt SectorProving) apply(*SectorInfo) {}

View File

@ -17,4 +17,8 @@ type Config struct {
WaitDealsDelay time.Duration
AlwaysKeepUnsealedCopy bool
AggregateCommits bool
MinCommitBatch int
MaxCommitBatch int
}

View File

@ -103,6 +103,7 @@ type Sealing struct {
stats SectorStats
terminator *TerminateBatcher
commiter *CommitBatcher
getConfig GetSealingConfigFunc
dealInfo *CurrentDealInfoManager
@ -152,6 +153,7 @@ func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds
addrSel: as,
terminator: NewTerminationBatcher(context.TODO(), maddr, api, as, fc),
commiter: NewCommitBatcher(context.TODO(), maddr, api, as, fc, gc, verif),
getConfig: gc,
dealInfo: &CurrentDealInfoManager{api},
@ -202,6 +204,14 @@ func (m *Sealing) TerminatePending(ctx context.Context) ([]abi.SectorID, error)
return m.terminator.Pending(ctx)
}
func (m *Sealing) CommitFlush(ctx context.Context) (*cid.Cid, error) {
return m.commiter.Flush(ctx)
}
func (m *Sealing) CommitPending(ctx context.Context) ([]abi.SectorID, error) {
return m.commiter.Pending(ctx)
}
func (m *Sealing) currentSealProof(ctx context.Context) (abi.RegisteredSealProof, error) {
mi, err := m.api.StateMinerInfo(ctx, m.maddr, nil)
if err != nil {

View File

@ -17,6 +17,8 @@ var ExistSectorStateList = map[SectorState]struct{}{
Committing: {},
SubmitCommit: {},
CommitWait: {},
SubmitCommitAggregate: {},
CommitAggregateWait: {},
FinalizeSector: {},
Proving: {},
FailedUnrecoverable: {},
@ -56,8 +58,14 @@ const (
PreCommitWait SectorState = "PreCommitWait" // waiting for precommit to land on chain
WaitSeed SectorState = "WaitSeed" // waiting for seed
Committing SectorState = "Committing" // compute PoRep
// single commit
SubmitCommit SectorState = "SubmitCommit" // send commit message to the chain
CommitWait SectorState = "CommitWait" // wait for the commit message to land on chain
SubmitCommitAggregate SectorState = "SubmitCommitAggregate"
CommitAggregateWait SectorState = "CommitAggregateWait"
FinalizeSector SectorState = "FinalizeSector"
Proving SectorState = "Proving"
// error modes
@ -91,7 +99,7 @@ func toStatState(st SectorState) statSectorState {
switch st {
case UndefinedSectorState, Empty, WaitDeals, AddPiece:
return sstStaging
case Packing, GetTicket, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, WaitSeed, Committing, SubmitCommit, CommitWait, FinalizeSector:
case Packing, GetTicket, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, WaitSeed, Committing, SubmitCommit, CommitWait, SubmitCommitAggregate, CommitAggregateWait, FinalizeSector:
return sstSealing
case Proving, Removed, Removing, Terminating, TerminateWait, TerminateFinality, TerminateFailed:
return sstProving

View File

@ -12,6 +12,7 @@ import (
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/go-state-types/exitcode"
"github.com/filecoin-project/go-statemachine"
"github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/api"
@ -452,6 +453,14 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo)
}
func (m *Sealing) handleSubmitCommit(ctx statemachine.Context, sector SectorInfo) error {
cfg, err := m.getConfig()
if err != nil {
return xerrors.Errorf("getting config: %w", err)
}
if cfg.AggregateCommits {
return ctx.Send(SectorSubmitCommitAggregate{})
}
tok, _, err := m.api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handleCommitting: api error, not proceeding: %+v", err)
@ -514,6 +523,29 @@ func (m *Sealing) handleSubmitCommit(ctx statemachine.Context, sector SectorInfo
})
}
func (m *Sealing) handleSubmitCommitAggregate(ctx statemachine.Context, sector SectorInfo) error {
if sector.CommD == nil || sector.CommR == nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("sector had nil commR or commD")})
}
mcid, err := m.commiter.AddCommit(ctx.Context(), sector.SectorNumber, AggregateInput{
info: proof.AggregateSealVerifyInfo{
Number: sector.SectorNumber,
DealIDs: sector.dealIDs(),
Randomness: sector.TicketValue,
InteractiveRandomness: sector.SeedValue,
SealedCID: *sector.CommR,
UnsealedCID: *sector.CommD,
},
proof: sector.Proof, // todo: this correct??
})
if err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("queuing commit for aggregation failed: %w", err)})
}
return ctx.Send(SectorCommitAggregateSent{mcid})
}
func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo) error {
if sector.CommitMessage == nil {
log.Errorf("sector %d entered commit wait state without a message cid", sector.SectorNumber)

4
go.mod
View File

@ -38,7 +38,7 @@ require (
github.com/filecoin-project/go-jsonrpc v0.1.4-0.20210217175800-45ea43ac2bec
github.com/filecoin-project/go-multistore v0.0.3
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200701152213-3e0f0afdc261
github.com/filecoin-project/go-paramfetch v0.0.2-0.20210330140417-936748d3f5ec
github.com/filecoin-project/go-state-types v0.1.1-0.20210506134452-99b279731c48
github.com/filecoin-project/go-statemachine v0.0.0-20200925024713-05bd7c71fbfe
github.com/filecoin-project/go-statestore v0.1.1
@ -47,7 +47,7 @@ require (
github.com/filecoin-project/specs-actors/v2 v2.3.5-0.20210114162132-5b58b773f4fb
github.com/filecoin-project/specs-actors/v3 v3.1.0
github.com/filecoin-project/specs-actors/v4 v4.0.0
github.com/filecoin-project/specs-actors/v5 v5.0.0-20210510162709-3255bdd9f2bb
github.com/filecoin-project/specs-actors/v5 v5.0.0-20210512015452-4fe3889fff57
github.com/filecoin-project/specs-storage v0.1.1-0.20201105051918-5188d9774506
github.com/filecoin-project/test-vectors/schema v0.0.5
github.com/gbrlsnchs/jwt/v3 v3.0.0-beta.1

10
go.sum
View File

@ -287,8 +287,8 @@ github.com/filecoin-project/go-multistore v0.0.3 h1:vaRBY4YiA2UZFPK57RNuewypB8u0
github.com/filecoin-project/go-multistore v0.0.3/go.mod h1:kaNqCC4IhU4B1uyr7YWFHd23TL4KM32aChS0jNkyUvQ=
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20 h1:+/4aUeUoKr6AKfPE3mBhXA5spIV6UcKdTYDPNU2Tdmg=
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20/go.mod h1:mPn+LRRd5gEKNAtc+r3ScpW2JRU/pj4NBKdADYWHiak=
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200701152213-3e0f0afdc261 h1:A256QonvzRaknIIAuWhe/M2dpV2otzs3NBhi5TWa/UA=
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200701152213-3e0f0afdc261/go.mod h1:fZzmf4tftbwf9S37XRifoJlz7nCjRdIrMGLR07dKLCc=
github.com/filecoin-project/go-paramfetch v0.0.2-0.20210330140417-936748d3f5ec h1:gExwWUiT1TcARkxGneS4nvp9C+wBsKU0bFdg7qFpNco=
github.com/filecoin-project/go-paramfetch v0.0.2-0.20210330140417-936748d3f5ec/go.mod h1:fZzmf4tftbwf9S37XRifoJlz7nCjRdIrMGLR07dKLCc=
github.com/filecoin-project/go-state-types v0.0.0-20200903145444-247639ffa6ad/go.mod h1:IQ0MBPnonv35CJHtWSN3YY1Hz2gkPru1Q9qoaYLxx9I=
github.com/filecoin-project/go-state-types v0.0.0-20200904021452-1883f36ca2f4/go.mod h1:IQ0MBPnonv35CJHtWSN3YY1Hz2gkPru1Q9qoaYLxx9I=
github.com/filecoin-project/go-state-types v0.0.0-20200928172055-2df22083d8ab/go.mod h1:ezYnPf0bNkTsDibL/psSz5dy4B5awOJ/E7P2Saeep8g=
@ -310,14 +310,16 @@ github.com/filecoin-project/specs-actors v0.9.13 h1:rUEOQouefi9fuVY/2HOroROJlZbO
github.com/filecoin-project/specs-actors v0.9.13/go.mod h1:TS1AW/7LbG+615j4NsjMK1qlpAwaFsG9w0V2tg2gSao=
github.com/filecoin-project/specs-actors/v2 v2.0.1/go.mod h1:v2NZVYinNIKA9acEMBm5wWXxqv5+frFEbekBFemYghY=
github.com/filecoin-project/specs-actors/v2 v2.3.2/go.mod h1:UuJQLoTx/HPvvWeqlIFmC/ywlOLHNe8SNQ3OunFbu2Y=
github.com/filecoin-project/specs-actors/v2 v2.3.4/go.mod h1:UuJQLoTx/HPvvWeqlIFmC/ywlOLHNe8SNQ3OunFbu2Y=
github.com/filecoin-project/specs-actors/v2 v2.3.5-0.20210114162132-5b58b773f4fb h1:orr/sMzrDZUPAveRE+paBdu1kScIUO5zm+HYeh+VlhA=
github.com/filecoin-project/specs-actors/v2 v2.3.5-0.20210114162132-5b58b773f4fb/go.mod h1:LljnY2Mn2homxZsmokJZCpRuhOPxfXhvcek5gWkmqAc=
github.com/filecoin-project/specs-actors/v3 v3.0.4-0.20210227000520-b3317b86f4d1/go.mod h1:oMcmEed6B7H/wHabM3RQphTIhq0ibAKsbpYs+bQ/uxQ=
github.com/filecoin-project/specs-actors/v3 v3.1.0 h1:s4qiPw8pgypqBGAy853u/zdZJ7K9cTZdM1rTiSonHrg=
github.com/filecoin-project/specs-actors/v3 v3.1.0/go.mod h1:mpynccOLlIRy0QnR008BwYBwT9fen+sPR13MA1VmMww=
github.com/filecoin-project/specs-actors/v4 v4.0.0 h1:vMALksY5G3J5rj3q9rbcyB+f4Tk1xrLqSgdB3jOok4s=
github.com/filecoin-project/specs-actors/v4 v4.0.0/go.mod h1:TkHXf/l7Wyw4ZejyXIPS2rK8bBO0rdwhTZyQQgaglng=
github.com/filecoin-project/specs-actors/v5 v5.0.0-20210510162709-3255bdd9f2bb h1:i2ZBHLiNYyyhNlfjfB/TGtGLlb8dgiGiVCDZlGpUtUc=
github.com/filecoin-project/specs-actors/v5 v5.0.0-20210510162709-3255bdd9f2bb/go.mod h1:XAgQWq5pu0MBwx3MI5uJ6fK/Q8jCkZnKNNLxvDcbXew=
github.com/filecoin-project/specs-actors/v5 v5.0.0-20210512015452-4fe3889fff57 h1:N6IBsnGXfAMXd677G6EiOKewFwQ7Ulcuupi4U6wYmXE=
github.com/filecoin-project/specs-actors/v5 v5.0.0-20210512015452-4fe3889fff57/go.mod h1:283yBMMUSDB2abcjP/hhrwTkhb9h3sfM6KGrep/ZlBI=
github.com/filecoin-project/specs-storage v0.1.1-0.20201105051918-5188d9774506 h1:Ur/l2+6qN+lQiqjozWWc5p9UDaAMDZKTlDS98oRnlIw=
github.com/filecoin-project/specs-storage v0.1.1-0.20201105051918-5188d9774506/go.mod h1:nJRRM7Aa9XVvygr3W9k6xGF46RWzr2zxF/iGoAIfA/g=
github.com/filecoin-project/test-vectors/schema v0.0.5 h1:w3zHQhzM4pYxJDl21avXjOKBLF8egrvwUwjpT8TquDg=

View File

@ -281,7 +281,8 @@
"ConfirmUpdateWorkerKey",
"RepayDebt",
"ChangeOwnerAddress",
"DisputeWindowedPoSt"
"DisputeWindowedPoSt",
"ProveCommitAggregate"
],
"fil/3/storagepower": [
"Send",

View File

@ -82,6 +82,10 @@ type SealingConfig struct {
AlwaysKeepUnsealedCopy bool
AggregateCommits bool
MinCommitBatch int
MaxCommitBatch int
// Keep this many sectors in sealing pipeline, start CC if needed
// todo TargetSealingSectors uint64
@ -237,6 +241,10 @@ func DefaultStorageMiner() *StorageMiner {
MaxSealingSectorsForDeals: 0,
WaitDealsDelay: Duration(time.Hour * 6),
AlwaysKeepUnsealedCopy: true,
AggregateCommits: true,
MinCommitBatch: 5, // todo: base this on some real numbers
MaxCommitBatch: 400,
},
Storage: sectorstorage.SealerConfig{

View File

@ -378,6 +378,14 @@ func (sm *StorageMinerAPI) SectorMarkForUpgrade(ctx context.Context, id abi.Sect
return sm.Miner.MarkForUpgrade(id)
}
func (sm *StorageMinerAPI) SectorCommitFlush(ctx context.Context) (*cid.Cid, error) {
return sm.Miner.CommitFlush(ctx)
}
func (sm *StorageMinerAPI) SectorCommitPending(ctx context.Context) ([]abi.SectorID, error) {
return sm.Miner.CommitPending(ctx)
}
func (sm *StorageMinerAPI) WorkerConnect(ctx context.Context, url string) error {
w, err := connectRemoteWorker(ctx, sm, url)
if err != nil {

View File

@ -99,7 +99,7 @@ func GetParams(spt abi.RegisteredSealProof) error {
}
// TODO: We should fetch the params for the actual proof type, not just based on the size.
if err := paramfetch.GetParams(context.TODO(), build.ParametersJSON(), uint64(ssize)); err != nil {
if err := paramfetch.GetParams(context.TODO(), build.ParametersJSON(), build.SrsJSON(), uint64(ssize)); err != nil {
return xerrors.Errorf("fetching proof parameters: %w", err)
}
@ -824,6 +824,9 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error
MaxSealingSectorsForDeals: cfg.MaxSealingSectorsForDeals,
WaitDealsDelay: config.Duration(cfg.WaitDealsDelay),
AlwaysKeepUnsealedCopy: cfg.AlwaysKeepUnsealedCopy,
AggregateCommits: cfg.AggregateCommits,
MinCommitBatch: cfg.MinCommitBatch,
MaxCommitBatch: cfg.MaxCommitBatch,
}
})
return
@ -839,6 +842,9 @@ func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error
MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals,
WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay),
AlwaysKeepUnsealedCopy: cfg.Sealing.AlwaysKeepUnsealedCopy,
AggregateCommits: cfg.Sealing.AggregateCommits,
MinCommitBatch: cfg.Sealing.MinCommitBatch,
MaxCommitBatch: cfg.Sealing.MaxCommitBatch,
}
})
return

View File

@ -59,6 +59,14 @@ func (m *Miner) TerminatePending(ctx context.Context) ([]abi.SectorID, error) {
return m.sealing.TerminatePending(ctx)
}
func (m *Miner) CommitFlush(ctx context.Context) (*cid.Cid, error) {
return m.sealing.CommitFlush(ctx)
}
func (m *Miner) CommitPending(ctx context.Context) ([]abi.SectorID, error) {
return m.sealing.CommitPending(ctx)
}
func (m *Miner) MarkForUpgrade(id abi.SectorNumber) error {
return m.sealing.MarkForUpgrade(id)
}