Merge branch 'master' into nonsense/split-market-miner-processes
This commit is contained in:
commit
d45bb14015
@ -133,6 +133,9 @@ jobs:
|
||||
deadline-test:
|
||||
type: string
|
||||
default: "0"
|
||||
proofs-log-test:
|
||||
type: string
|
||||
default: "0"
|
||||
test-suite-name:
|
||||
type: string
|
||||
default: unit
|
||||
@ -167,6 +170,7 @@ jobs:
|
||||
environment:
|
||||
LOTUS_TEST_WINDOW_POST: << parameters.winpost-test >>
|
||||
LOTUS_TEST_DEADLINE_TOGGLING: << parameters.deadline-test >>
|
||||
TEST_RUSTPROOFS_LOGS: << parameters.proofs-log-test >>
|
||||
SKIP_CONFORMANCE: "1"
|
||||
command: |
|
||||
mkdir -p /tmp/test-reports/<< parameters.test-suite-name >>
|
||||
@ -212,6 +216,8 @@ jobs:
|
||||
<<: *test
|
||||
test-terminate:
|
||||
<<: *test
|
||||
check-proofs-multicore-sdr:
|
||||
<<: *test
|
||||
test-conformance:
|
||||
description: |
|
||||
Run tests using a corpus of interoperable test vectors for Filecoin
|
||||
@ -815,6 +821,12 @@ workflows:
|
||||
tags:
|
||||
only:
|
||||
- /^v\d+\.\d+\.\d+(-rc\d+)?$/
|
||||
- check-proofs-multicore-sdr:
|
||||
codecov-upload: true
|
||||
go-test-flags: "-run=TestMulticoreSDR"
|
||||
test-suite-name: multicore-sdr-check
|
||||
packages: "./extern/sector-storage/ffiwrapper"
|
||||
proofs-log-test: "1"
|
||||
- test-conformance:
|
||||
test-suite-name: conformance
|
||||
packages: "./conformance"
|
||||
|
@ -629,7 +629,7 @@ type FullNode interface {
|
||||
// proposal. This method of approval can be used to ensure you only approve
|
||||
// exactly the transaction you think you are.
|
||||
// It takes the following params: <multisig address>, <proposed message ID>, <proposer address>, <recipient address>, <value to transfer>,
|
||||
// <sender address of the approve msg>, <method to call in the proposed message>, <params to include in the proposed message>
|
||||
// <sender address of the approve msg>, <method to call in the approved message>, <params to include in the proposed message>
|
||||
MsigApproveTxnHash(context.Context, address.Address, uint64, address.Address, address.Address, types.BigInt, address.Address, uint64, []byte) (cid.Cid, error) //perm:sign
|
||||
|
||||
// MsigCancel cancels a previously-proposed multisig message
|
||||
|
@ -19,18 +19,18 @@ import (
|
||||
var baseFeeUpperBoundFactor = types.NewInt(10)
|
||||
|
||||
// CheckMessages performs a set of logic checks for a list of messages, prior to submitting it to the mpool
|
||||
func (mp *MessagePool) CheckMessages(protos []*api.MessagePrototype) ([][]api.MessageCheckStatus, error) {
|
||||
func (mp *MessagePool) CheckMessages(ctx context.Context, protos []*api.MessagePrototype) ([][]api.MessageCheckStatus, error) {
|
||||
flex := make([]bool, len(protos))
|
||||
msgs := make([]*types.Message, len(protos))
|
||||
for i, p := range protos {
|
||||
flex[i] = !p.ValidNonce
|
||||
msgs[i] = &p.Message
|
||||
}
|
||||
return mp.checkMessages(msgs, false, flex)
|
||||
return mp.checkMessages(ctx, msgs, false, flex)
|
||||
}
|
||||
|
||||
// CheckPendingMessages performs a set of logical sets for all messages pending from a given actor
|
||||
func (mp *MessagePool) CheckPendingMessages(from address.Address) ([][]api.MessageCheckStatus, error) {
|
||||
func (mp *MessagePool) CheckPendingMessages(ctx context.Context, from address.Address) ([][]api.MessageCheckStatus, error) {
|
||||
var msgs []*types.Message
|
||||
mp.lk.Lock()
|
||||
mset, ok := mp.pending[from]
|
||||
@ -49,12 +49,12 @@ func (mp *MessagePool) CheckPendingMessages(from address.Address) ([][]api.Messa
|
||||
return msgs[i].Nonce < msgs[j].Nonce
|
||||
})
|
||||
|
||||
return mp.checkMessages(msgs, true, nil)
|
||||
return mp.checkMessages(ctx, msgs, true, nil)
|
||||
}
|
||||
|
||||
// CheckReplaceMessages performs a set of logical checks for related messages while performing a
|
||||
// replacement.
|
||||
func (mp *MessagePool) CheckReplaceMessages(replace []*types.Message) ([][]api.MessageCheckStatus, error) {
|
||||
func (mp *MessagePool) CheckReplaceMessages(ctx context.Context, replace []*types.Message) ([][]api.MessageCheckStatus, error) {
|
||||
msgMap := make(map[address.Address]map[uint64]*types.Message)
|
||||
count := 0
|
||||
|
||||
@ -94,12 +94,12 @@ func (mp *MessagePool) CheckReplaceMessages(replace []*types.Message) ([][]api.M
|
||||
start = end
|
||||
}
|
||||
|
||||
return mp.checkMessages(msgs, true, nil)
|
||||
return mp.checkMessages(ctx, msgs, true, nil)
|
||||
}
|
||||
|
||||
// flexibleNonces should be either nil or of len(msgs), it signifies that message at given index
|
||||
// has non-determied nonce at this point
|
||||
func (mp *MessagePool) checkMessages(msgs []*types.Message, interned bool, flexibleNonces []bool) (result [][]api.MessageCheckStatus, err error) {
|
||||
func (mp *MessagePool) checkMessages(ctx context.Context, msgs []*types.Message, interned bool, flexibleNonces []bool) (result [][]api.MessageCheckStatus, err error) {
|
||||
if mp.api.IsLite() {
|
||||
return nil, nil
|
||||
}
|
||||
@ -160,7 +160,7 @@ func (mp *MessagePool) checkMessages(msgs []*types.Message, interned bool, flexi
|
||||
} else {
|
||||
mp.lk.Unlock()
|
||||
|
||||
stateNonce, err := mp.getStateNonce(m.From, curTs)
|
||||
stateNonce, err := mp.getStateNonce(ctx, m.From, curTs)
|
||||
if err != nil {
|
||||
check.OK = false
|
||||
check.Err = fmt.Sprintf("error retrieving state nonce: %s", err.Error())
|
||||
@ -193,7 +193,7 @@ func (mp *MessagePool) checkMessages(msgs []*types.Message, interned bool, flexi
|
||||
|
||||
balance, ok := balances[m.From]
|
||||
if !ok {
|
||||
balance, err = mp.getStateBalance(m.From, curTs)
|
||||
balance, err = mp.getStateBalance(ctx, m.From, curTs)
|
||||
if err != nil {
|
||||
check.OK = false
|
||||
check.Err = fmt.Sprintf("error retrieving state balance: %s", err)
|
||||
|
@ -34,6 +34,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/vm"
|
||||
"github.com/filecoin-project/lotus/journal"
|
||||
"github.com/filecoin-project/lotus/lib/sigs"
|
||||
"github.com/filecoin-project/lotus/metrics"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
|
||||
"github.com/raulk/clock"
|
||||
@ -577,7 +578,7 @@ func (mp *MessagePool) addLocal(ctx context.Context, m *types.SignedMessage) err
|
||||
return nil
|
||||
}
|
||||
|
||||
// verifyMsgBeforeAdd verifies that the message meets the minimum criteria for block inclusio
|
||||
// verifyMsgBeforeAdd verifies that the message meets the minimum criteria for block inclusion
|
||||
// and whether the message has enough funds to be included in the next 20 blocks.
|
||||
// If the message is not valid for block inclusion, it returns an error.
|
||||
// For local messages, if the message can be included in the next 20 blocks, it returns true to
|
||||
@ -631,6 +632,9 @@ func (mp *MessagePool) verifyMsgBeforeAdd(m *types.SignedMessage, curTs *types.T
|
||||
}
|
||||
|
||||
func (mp *MessagePool) Push(ctx context.Context, m *types.SignedMessage) (cid.Cid, error) {
|
||||
done := metrics.Timer(ctx, metrics.MpoolPushDuration)
|
||||
defer done()
|
||||
|
||||
err := mp.checkMessage(m)
|
||||
if err != nil {
|
||||
return cid.Undef, err
|
||||
@ -697,6 +701,9 @@ func (mp *MessagePool) checkMessage(m *types.SignedMessage) error {
|
||||
}
|
||||
|
||||
func (mp *MessagePool) Add(ctx context.Context, m *types.SignedMessage) error {
|
||||
done := metrics.Timer(ctx, metrics.MpoolAddDuration)
|
||||
defer done()
|
||||
|
||||
err := mp.checkMessage(m)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -752,7 +759,7 @@ func (mp *MessagePool) VerifyMsgSig(m *types.SignedMessage) error {
|
||||
}
|
||||
|
||||
func (mp *MessagePool) checkBalance(ctx context.Context, m *types.SignedMessage, curTs *types.TipSet) error {
|
||||
balance, err := mp.getStateBalance(m.Message.From, curTs)
|
||||
balance, err := mp.getStateBalance(ctx, m.Message.From, curTs)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to check sender balance: %s: %w", err, ErrSoftValidationFailure)
|
||||
}
|
||||
@ -785,7 +792,10 @@ func (mp *MessagePool) checkBalance(ctx context.Context, m *types.SignedMessage,
|
||||
}
|
||||
|
||||
func (mp *MessagePool) addTs(ctx context.Context, m *types.SignedMessage, curTs *types.TipSet, local, untrusted bool) (bool, error) {
|
||||
snonce, err := mp.getStateNonce(m.Message.From, curTs)
|
||||
done := metrics.Timer(ctx, metrics.MpoolAddTsDuration)
|
||||
defer done()
|
||||
|
||||
snonce, err := mp.getStateNonce(ctx, m.Message.From, curTs)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrSoftValidationFailure)
|
||||
}
|
||||
@ -833,7 +843,7 @@ func (mp *MessagePool) addLoaded(ctx context.Context, m *types.SignedMessage) er
|
||||
return xerrors.Errorf("current tipset not loaded")
|
||||
}
|
||||
|
||||
snonce, err := mp.getStateNonce(m.Message.From, curTs)
|
||||
snonce, err := mp.getStateNonce(ctx, m.Message.From, curTs)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrSoftValidationFailure)
|
||||
}
|
||||
@ -885,7 +895,7 @@ func (mp *MessagePool) addLocked(ctx context.Context, m *types.SignedMessage, st
|
||||
}
|
||||
|
||||
if !ok {
|
||||
nonce, err := mp.getStateNonce(m.Message.From, mp.curTs)
|
||||
nonce, err := mp.getStateNonce(ctx, m.Message.From, mp.curTs)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to get initial actor nonce: %w", err)
|
||||
}
|
||||
@ -946,7 +956,7 @@ func (mp *MessagePool) GetActor(_ context.Context, addr address.Address, _ types
|
||||
}
|
||||
|
||||
func (mp *MessagePool) getNonceLocked(ctx context.Context, addr address.Address, curTs *types.TipSet) (uint64, error) {
|
||||
stateNonce, err := mp.getStateNonce(addr, curTs) // sanity check
|
||||
stateNonce, err := mp.getStateNonce(ctx, addr, curTs) // sanity check
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
@ -970,7 +980,10 @@ func (mp *MessagePool) getNonceLocked(ctx context.Context, addr address.Address,
|
||||
return stateNonce, nil
|
||||
}
|
||||
|
||||
func (mp *MessagePool) getStateNonce(addr address.Address, ts *types.TipSet) (uint64, error) {
|
||||
func (mp *MessagePool) getStateNonce(ctx context.Context, addr address.Address, ts *types.TipSet) (uint64, error) {
|
||||
done := metrics.Timer(ctx, metrics.MpoolGetNonceDuration)
|
||||
defer done()
|
||||
|
||||
act, err := mp.api.GetActorAfter(addr, ts)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
@ -979,7 +992,10 @@ func (mp *MessagePool) getStateNonce(addr address.Address, ts *types.TipSet) (ui
|
||||
return act.Nonce, nil
|
||||
}
|
||||
|
||||
func (mp *MessagePool) getStateBalance(addr address.Address, ts *types.TipSet) (types.BigInt, error) {
|
||||
func (mp *MessagePool) getStateBalance(ctx context.Context, addr address.Address, ts *types.TipSet) (types.BigInt, error) {
|
||||
done := metrics.Timer(ctx, metrics.MpoolGetBalanceDuration)
|
||||
defer done()
|
||||
|
||||
act, err := mp.api.GetActorAfter(addr, ts)
|
||||
if err != nil {
|
||||
return types.EmptyInt, err
|
||||
|
@ -507,6 +507,12 @@ func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubs
|
||||
return mv.validateLocalMessage(ctx, msg)
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
ms := time.Now().Sub(start).Microseconds()
|
||||
stats.Record(ctx, metrics.MessageValidationDuration.M(float64(ms)/1000))
|
||||
}()
|
||||
|
||||
stats.Record(ctx, metrics.MessageReceived.M(1))
|
||||
m, err := types.DecodeSignedMessage(msg.Message.GetData())
|
||||
if err != nil {
|
||||
@ -538,6 +544,12 @@ func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubs
|
||||
return pubsub.ValidationReject
|
||||
}
|
||||
}
|
||||
|
||||
ctx, _ = tag.New(
|
||||
ctx,
|
||||
tag.Upsert(metrics.MsgValid, "true"),
|
||||
)
|
||||
|
||||
stats.Record(ctx, metrics.MessageValidationSuccess.M(1))
|
||||
return pubsub.ValidationAccept
|
||||
}
|
||||
@ -547,6 +559,13 @@ func (mv *MessageValidator) validateLocalMessage(ctx context.Context, msg *pubsu
|
||||
ctx,
|
||||
tag.Upsert(metrics.Local, "true"),
|
||||
)
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
ms := time.Now().Sub(start).Microseconds()
|
||||
stats.Record(ctx, metrics.MessageValidationDuration.M(float64(ms)/1000))
|
||||
}()
|
||||
|
||||
// do some lightweight validation
|
||||
stats.Record(ctx, metrics.MessagePublished.M(1))
|
||||
|
||||
@ -581,6 +600,11 @@ func (mv *MessageValidator) validateLocalMessage(ctx context.Context, msg *pubsu
|
||||
return pubsub.ValidationIgnore
|
||||
}
|
||||
|
||||
ctx, _ = tag.New(
|
||||
ctx,
|
||||
tag.Upsert(metrics.MsgValid, "true"),
|
||||
)
|
||||
|
||||
stats.Record(ctx, metrics.MessageValidationSuccess.M(1))
|
||||
return pubsub.ValidationAccept
|
||||
}
|
||||
|
@ -536,7 +536,7 @@ var ChainListCmd = &cli.Command{
|
||||
Aliases: []string{"love"},
|
||||
Usage: "View a segment of the chain",
|
||||
Flags: []cli.Flag{
|
||||
&cli.Uint64Flag{Name: "height"},
|
||||
&cli.Uint64Flag{Name: "height", DefaultText: "current head"},
|
||||
&cli.IntFlag{Name: "count", Value: 30},
|
||||
&cli.StringFlag{
|
||||
Name: "format",
|
||||
|
@ -59,6 +59,7 @@ func main() {
|
||||
signaturesCmd,
|
||||
actorCmd,
|
||||
minerTypesCmd,
|
||||
minerMultisigsCmd,
|
||||
}
|
||||
|
||||
app := &cli.App{
|
||||
|
388
cmd/lotus-shed/miner-multisig.go
Normal file
388
cmd/lotus-shed/miner-multisig.go
Normal file
@ -0,0 +1,388 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
miner5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/miner"
|
||||
|
||||
msig5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/multisig"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-state-types/big"
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain/actors"
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
lcli "github.com/filecoin-project/lotus/cli"
|
||||
"github.com/urfave/cli/v2"
|
||||
"golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
var minerMultisigsCmd = &cli.Command{
|
||||
Name: "miner-multisig",
|
||||
Description: "a collection of utilities for using multisigs as owner addresses of miners",
|
||||
Subcommands: []*cli.Command{
|
||||
mmProposeWithdrawBalance,
|
||||
mmApproveWithdrawBalance,
|
||||
mmProposeChangeOwner,
|
||||
mmApproveChangeOwner,
|
||||
},
|
||||
Flags: []cli.Flag{
|
||||
&cli.StringFlag{
|
||||
Name: "from",
|
||||
Usage: "specify address to send message from",
|
||||
Required: true,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "multisig",
|
||||
Usage: "specify multisig that will receive the message",
|
||||
Required: true,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "miner",
|
||||
Usage: "specify miner being acted upon",
|
||||
Required: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
var mmProposeWithdrawBalance = &cli.Command{
|
||||
Name: "propose-withdraw",
|
||||
Usage: "Propose to withdraw FIL from the miner",
|
||||
ArgsUsage: "[amount]",
|
||||
Action: func(cctx *cli.Context) error {
|
||||
if !cctx.Args().Present() {
|
||||
return fmt.Errorf("must pass amount to withdraw")
|
||||
}
|
||||
|
||||
api, closer, err := lcli.GetFullNodeAPI(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer closer()
|
||||
|
||||
ctx := lcli.ReqContext(cctx)
|
||||
|
||||
multisigAddr, sender, minerAddr, err := getInputs(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
val, err := types.ParseFIL(cctx.Args().First())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sp, err := actors.SerializeParams(&miner5.WithdrawBalanceParams{
|
||||
AmountRequested: abi.TokenAmount(val),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
pcid, err := api.MsigPropose(ctx, multisigAddr, minerAddr, big.Zero(), sender, uint64(miner.Methods.WithdrawBalance), sp)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("proposing message: %w", err)
|
||||
}
|
||||
|
||||
fmt.Fprintln(cctx.App.Writer, "Propose Message CID:", pcid)
|
||||
|
||||
// wait for it to get mined into a block
|
||||
wait, err := api.StateWaitMsg(ctx, pcid, build.MessageConfidence)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// check it executed successfully
|
||||
if wait.Receipt.ExitCode != 0 {
|
||||
fmt.Fprintln(cctx.App.Writer, "Propose owner change tx failed!")
|
||||
return err
|
||||
}
|
||||
|
||||
var retval msig5.ProposeReturn
|
||||
if err := retval.UnmarshalCBOR(bytes.NewReader(wait.Receipt.Return)); err != nil {
|
||||
return fmt.Errorf("failed to unmarshal propose return value: %w", err)
|
||||
}
|
||||
|
||||
fmt.Printf("Transaction ID: %d\n", retval.TxnID)
|
||||
if retval.Applied {
|
||||
fmt.Printf("Transaction was executed during propose\n")
|
||||
fmt.Printf("Exit Code: %d\n", retval.Code)
|
||||
fmt.Printf("Return Value: %x\n", retval.Ret)
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
var mmApproveWithdrawBalance = &cli.Command{
|
||||
Name: "approve-withdraw",
|
||||
Usage: "Approve to withdraw FIL from the miner",
|
||||
ArgsUsage: "[amount txnId proposer]",
|
||||
Action: func(cctx *cli.Context) error {
|
||||
if cctx.NArg() != 3 {
|
||||
return fmt.Errorf("must pass amount, txn Id, and proposer address")
|
||||
}
|
||||
|
||||
api, closer, err := lcli.GetFullNodeAPI(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer closer()
|
||||
|
||||
ctx := lcli.ReqContext(cctx)
|
||||
|
||||
multisigAddr, sender, minerAddr, err := getInputs(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
val, err := types.ParseFIL(cctx.Args().First())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sp, err := actors.SerializeParams(&miner5.WithdrawBalanceParams{
|
||||
AmountRequested: abi.TokenAmount(val),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
txid, err := strconv.ParseUint(cctx.Args().Get(1), 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
proposer, err := address.NewFromString(cctx.Args().Get(2))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
acid, err := api.MsigApproveTxnHash(ctx, multisigAddr, txid, proposer, minerAddr, big.Zero(), sender, uint64(miner.Methods.WithdrawBalance), sp)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("approving message: %w", err)
|
||||
}
|
||||
|
||||
fmt.Fprintln(cctx.App.Writer, "Approve Message CID:", acid)
|
||||
|
||||
// wait for it to get mined into a block
|
||||
wait, err := api.StateWaitMsg(ctx, acid, build.MessageConfidence)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// check it executed successfully
|
||||
if wait.Receipt.ExitCode != 0 {
|
||||
fmt.Fprintln(cctx.App.Writer, "Approve owner change tx failed!")
|
||||
return err
|
||||
}
|
||||
|
||||
var retval msig5.ApproveReturn
|
||||
if err := retval.UnmarshalCBOR(bytes.NewReader(wait.Receipt.Return)); err != nil {
|
||||
return fmt.Errorf("failed to unmarshal approve return value: %w", err)
|
||||
}
|
||||
|
||||
if retval.Applied {
|
||||
fmt.Printf("Transaction was executed with the approve\n")
|
||||
fmt.Printf("Exit Code: %d\n", retval.Code)
|
||||
fmt.Printf("Return Value: %x\n", retval.Ret)
|
||||
} else {
|
||||
fmt.Println("Transaction was approved, but not executed")
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
var mmProposeChangeOwner = &cli.Command{
|
||||
Name: "propose-change-owner",
|
||||
Usage: "Propose an owner address change",
|
||||
ArgsUsage: "[newOwner]",
|
||||
Action: func(cctx *cli.Context) error {
|
||||
if !cctx.Args().Present() {
|
||||
return fmt.Errorf("must pass new owner address")
|
||||
}
|
||||
|
||||
api, closer, err := lcli.GetFullNodeAPI(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer closer()
|
||||
|
||||
ctx := lcli.ReqContext(cctx)
|
||||
|
||||
multisigAddr, sender, minerAddr, err := getInputs(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
na, err := address.NewFromString(cctx.Args().First())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
newAddr, err := api.StateLookupID(ctx, na, types.EmptyTSK)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
mi, err := api.StateMinerInfo(ctx, minerAddr, types.EmptyTSK)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if mi.Owner == newAddr {
|
||||
return fmt.Errorf("owner address already set to %s", na)
|
||||
}
|
||||
|
||||
sp, err := actors.SerializeParams(&newAddr)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("serializing params: %w", err)
|
||||
}
|
||||
|
||||
pcid, err := api.MsigPropose(ctx, multisigAddr, minerAddr, big.Zero(), sender, uint64(miner.Methods.ChangeOwnerAddress), sp)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("proposing message: %w", err)
|
||||
}
|
||||
|
||||
fmt.Fprintln(cctx.App.Writer, "Propose Message CID:", pcid)
|
||||
|
||||
// wait for it to get mined into a block
|
||||
wait, err := api.StateWaitMsg(ctx, pcid, build.MessageConfidence)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// check it executed successfully
|
||||
if wait.Receipt.ExitCode != 0 {
|
||||
fmt.Fprintln(cctx.App.Writer, "Propose owner change tx failed!")
|
||||
return err
|
||||
}
|
||||
|
||||
var retval msig5.ProposeReturn
|
||||
if err := retval.UnmarshalCBOR(bytes.NewReader(wait.Receipt.Return)); err != nil {
|
||||
return fmt.Errorf("failed to unmarshal propose return value: %w", err)
|
||||
}
|
||||
|
||||
fmt.Printf("Transaction ID: %d\n", retval.TxnID)
|
||||
if retval.Applied {
|
||||
fmt.Printf("Transaction was executed during propose\n")
|
||||
fmt.Printf("Exit Code: %d\n", retval.Code)
|
||||
fmt.Printf("Return Value: %x\n", retval.Ret)
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
var mmApproveChangeOwner = &cli.Command{
|
||||
Name: "approve-change-owner",
|
||||
Usage: "Approve an owner address change",
|
||||
ArgsUsage: "[newOwner txnId proposer]",
|
||||
Action: func(cctx *cli.Context) error {
|
||||
if cctx.NArg() != 3 {
|
||||
return fmt.Errorf("must pass new owner address, txn Id, and proposer address")
|
||||
}
|
||||
|
||||
api, closer, err := lcli.GetFullNodeAPI(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer closer()
|
||||
|
||||
ctx := lcli.ReqContext(cctx)
|
||||
|
||||
multisigAddr, sender, minerAddr, err := getInputs(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
na, err := address.NewFromString(cctx.Args().First())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
newAddr, err := api.StateLookupID(ctx, na, types.EmptyTSK)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
txid, err := strconv.ParseUint(cctx.Args().Get(1), 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
proposer, err := address.NewFromString(cctx.Args().Get(2))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
mi, err := api.StateMinerInfo(ctx, minerAddr, types.EmptyTSK)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if mi.Owner == newAddr {
|
||||
return fmt.Errorf("owner address already set to %s", na)
|
||||
}
|
||||
|
||||
sp, err := actors.SerializeParams(&newAddr)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("serializing params: %w", err)
|
||||
}
|
||||
|
||||
acid, err := api.MsigApproveTxnHash(ctx, multisigAddr, txid, proposer, minerAddr, big.Zero(), sender, uint64(miner.Methods.ChangeOwnerAddress), sp)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("approving message: %w", err)
|
||||
}
|
||||
|
||||
fmt.Fprintln(cctx.App.Writer, "Approve Message CID:", acid)
|
||||
|
||||
// wait for it to get mined into a block
|
||||
wait, err := api.StateWaitMsg(ctx, acid, build.MessageConfidence)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// check it executed successfully
|
||||
if wait.Receipt.ExitCode != 0 {
|
||||
fmt.Fprintln(cctx.App.Writer, "Approve owner change tx failed!")
|
||||
return err
|
||||
}
|
||||
|
||||
var retval msig5.ApproveReturn
|
||||
if err := retval.UnmarshalCBOR(bytes.NewReader(wait.Receipt.Return)); err != nil {
|
||||
return fmt.Errorf("failed to unmarshal approve return value: %w", err)
|
||||
}
|
||||
|
||||
if retval.Applied {
|
||||
fmt.Printf("Transaction was executed with the approve\n")
|
||||
fmt.Printf("Exit Code: %d\n", retval.Code)
|
||||
fmt.Printf("Return Value: %x\n", retval.Ret)
|
||||
} else {
|
||||
fmt.Println("Transaction was approved, but not executed")
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
func getInputs(cctx *cli.Context) (address.Address, address.Address, address.Address, error) {
|
||||
multisigAddr, err := address.NewFromString(cctx.String("multisig"))
|
||||
if err != nil {
|
||||
return address.Undef, address.Undef, address.Undef, err
|
||||
}
|
||||
|
||||
sender, err := address.NewFromString(cctx.String("from"))
|
||||
if err != nil {
|
||||
return address.Undef, address.Undef, address.Undef, err
|
||||
}
|
||||
|
||||
minerAddr, err := address.NewFromString(cctx.String("miner"))
|
||||
if err != nil {
|
||||
return address.Undef, address.Undef, address.Undef, err
|
||||
}
|
||||
|
||||
return multisigAddr, sender, minerAddr, nil
|
||||
}
|
@ -295,6 +295,7 @@ var stateList = []stateMeta{
|
||||
{col: color.FgYellow, state: sealing.PreCommitBatchWait},
|
||||
{col: color.FgYellow, state: sealing.WaitSeed},
|
||||
{col: color.FgYellow, state: sealing.Committing},
|
||||
{col: color.FgYellow, state: sealing.CommitFinalize},
|
||||
{col: color.FgYellow, state: sealing.SubmitCommit},
|
||||
{col: color.FgYellow, state: sealing.CommitWait},
|
||||
{col: color.FgYellow, state: sealing.SubmitCommitAggregate},
|
||||
@ -315,6 +316,7 @@ var stateList = []stateMeta{
|
||||
{col: color.FgRed, state: sealing.PreCommitFailed},
|
||||
{col: color.FgRed, state: sealing.ComputeProofFailed},
|
||||
{col: color.FgRed, state: sealing.CommitFailed},
|
||||
{col: color.FgRed, state: sealing.CommitFinalizeFailed},
|
||||
{col: color.FgRed, state: sealing.PackingFailed},
|
||||
{col: color.FgRed, state: sealing.FinalizeFailed},
|
||||
{col: color.FgRed, state: sealing.Faulty},
|
||||
|
@ -2505,7 +2505,7 @@ using both transaction ID and a hash of the parameters used in the
|
||||
proposal. This method of approval can be used to ensure you only approve
|
||||
exactly the transaction you think you are.
|
||||
It takes the following params: <multisig address>, <proposed message ID>, <proposer address>, <recipient address>, <value to transfer>,
|
||||
<sender address of the approve msg>, <method to call in the proposed message>, <params to include in the proposed message>
|
||||
<sender address of the approve msg>, <method to call in the approved message>, <params to include in the proposed message>
|
||||
|
||||
|
||||
Perms: sign
|
||||
|
2
extern/filecoin-ffi
vendored
2
extern/filecoin-ffi
vendored
@ -1 +1 @@
|
||||
Subproject commit 1c7190dcc5bdef8042ca091129d6d3c10898dbdb
|
||||
Subproject commit d2e3aa7d61501d69bed6e898de13d1312b021e62
|
85
extern/sector-storage/ffiwrapper/sealer_test.go
vendored
85
extern/sector-storage/ffiwrapper/sealer_test.go
vendored
@ -31,6 +31,7 @@ import (
|
||||
"github.com/filecoin-project/specs-storage/storage"
|
||||
|
||||
ffi "github.com/filecoin-project/filecoin-ffi"
|
||||
"github.com/filecoin-project/filecoin-ffi/generated"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/actors/policy"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper/basicfs"
|
||||
@ -853,3 +854,87 @@ func TestAddPiece512MPadded(t *testing.T) {
|
||||
|
||||
require.Equal(t, "baga6ea4seaqonenxyku4o7hr5xkzbqsceipf6xgli3on54beqbk6k246sbooobq", c.PieceCID.String())
|
||||
}
|
||||
|
||||
func setupLogger(t *testing.T) *bytes.Buffer {
|
||||
_ = os.Setenv("RUST_LOG", "info")
|
||||
|
||||
var bb bytes.Buffer
|
||||
r, w, err := os.Pipe()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
go func() {
|
||||
_, _ = io.Copy(&bb, r)
|
||||
runtime.KeepAlive(w)
|
||||
}()
|
||||
|
||||
resp := generated.FilInitLogFd(int32(w.Fd()))
|
||||
resp.Deref()
|
||||
|
||||
defer generated.FilDestroyInitLogFdResponse(resp)
|
||||
|
||||
if resp.StatusCode != generated.FCPResponseStatusFCPNoError {
|
||||
t.Fatal(generated.RawString(resp.ErrorMsg).Copy())
|
||||
}
|
||||
|
||||
return &bb
|
||||
}
|
||||
|
||||
func TestMulticoreSDR(t *testing.T) {
|
||||
if os.Getenv("TEST_RUSTPROOFS_LOGS") != "1" {
|
||||
t.Skip("skipping test without TEST_RUSTPROOFS_LOGS=1")
|
||||
}
|
||||
|
||||
rustLogger := setupLogger(t)
|
||||
|
||||
getGrothParamFileAndVerifyingKeys(sectorSize)
|
||||
|
||||
dir, err := ioutil.TempDir("", "sbtest")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
miner := abi.ActorID(123)
|
||||
|
||||
sp := &basicfs.Provider{
|
||||
Root: dir,
|
||||
}
|
||||
sb, err := New(sp)
|
||||
if err != nil {
|
||||
t.Fatalf("%+v", err)
|
||||
}
|
||||
|
||||
cleanup := func() {
|
||||
if t.Failed() {
|
||||
fmt.Printf("not removing %s\n", dir)
|
||||
return
|
||||
}
|
||||
if err := os.RemoveAll(dir); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
defer cleanup()
|
||||
|
||||
si := storage.SectorRef{
|
||||
ID: abi.SectorID{Miner: miner, Number: 1},
|
||||
ProofType: sealProofType,
|
||||
}
|
||||
|
||||
s := seal{ref: si}
|
||||
|
||||
// check multicore
|
||||
_ = os.Setenv("FIL_PROOFS_USE_MULTICORE_SDR", "1")
|
||||
rustLogger.Reset()
|
||||
s.precommit(t, sb, si, func() {})
|
||||
|
||||
ok := false
|
||||
for _, s := range strings.Split(rustLogger.String(), "\n") {
|
||||
if strings.Contains(s, "create_label::multi") {
|
||||
ok = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
require.True(t, ok)
|
||||
}
|
||||
|
21
extern/storage-sealing/commit_batch.go
vendored
21
extern/storage-sealing/commit_batch.go
vendored
@ -32,6 +32,8 @@ import (
|
||||
|
||||
const arp = abi.RegisteredAggregationProof_SnarkPackV1
|
||||
|
||||
//go:generate go run github.com/golang/mock/mockgen -destination=mocks/mock_commit_batcher.go -package=mocks . CommitBatcherApi
|
||||
|
||||
type CommitBatcherApi interface {
|
||||
SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error)
|
||||
StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error)
|
||||
@ -44,9 +46,9 @@ type CommitBatcherApi interface {
|
||||
}
|
||||
|
||||
type AggregateInput struct {
|
||||
spt abi.RegisteredSealProof
|
||||
info proof5.AggregateSealVerifyInfo
|
||||
proof []byte
|
||||
Spt abi.RegisteredSealProof
|
||||
Info proof5.AggregateSealVerifyInfo
|
||||
Proof []byte
|
||||
}
|
||||
|
||||
type CommitBatcher struct {
|
||||
@ -246,6 +248,8 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa
|
||||
break
|
||||
}
|
||||
|
||||
res.Sectors = append(res.Sectors, id)
|
||||
|
||||
sc, err := b.getSectorCollateral(id, tok)
|
||||
if err != nil {
|
||||
res.FailedSectors[id] = err.Error()
|
||||
@ -254,9 +258,8 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa
|
||||
|
||||
collateral = big.Add(collateral, sc)
|
||||
|
||||
res.Sectors = append(res.Sectors, id)
|
||||
params.SectorNumbers.Set(uint64(id))
|
||||
infos = append(infos, p.info)
|
||||
infos = append(infos, p.Info)
|
||||
}
|
||||
|
||||
sort.Slice(infos, func(i, j int) bool {
|
||||
@ -264,7 +267,7 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa
|
||||
})
|
||||
|
||||
for _, info := range infos {
|
||||
proofs = append(proofs, b.todo[info.Number].proof)
|
||||
proofs = append(proofs, b.todo[info.Number].Proof)
|
||||
}
|
||||
|
||||
mid, err := address.IDFromAddress(b.maddr)
|
||||
@ -274,7 +277,7 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa
|
||||
|
||||
params.AggregateProof, err = b.prover.AggregateSealProofs(proof5.AggregateSealVerifyProofAndInfos{
|
||||
Miner: abi.ActorID(mid),
|
||||
SealProof: b.todo[infos[0].Number].spt,
|
||||
SealProof: b.todo[infos[0].Number].Spt,
|
||||
AggregateProof: arp,
|
||||
Infos: infos,
|
||||
}, proofs)
|
||||
@ -362,7 +365,7 @@ func (b *CommitBatcher) processSingle(mi miner.MinerInfo, sn abi.SectorNumber, i
|
||||
enc := new(bytes.Buffer)
|
||||
params := &miner.ProveCommitSectorParams{
|
||||
SectorNumber: sn,
|
||||
Proof: info.proof,
|
||||
Proof: info.Proof,
|
||||
}
|
||||
|
||||
if err := params.MarshalCBOR(enc); err != nil {
|
||||
@ -447,7 +450,7 @@ func (b *CommitBatcher) Pending(ctx context.Context) ([]abi.SectorID, error) {
|
||||
for _, s := range b.todo {
|
||||
res = append(res, abi.SectorID{
|
||||
Miner: abi.ActorID(mid),
|
||||
Number: s.info.Number,
|
||||
Number: s.Info.Number,
|
||||
})
|
||||
}
|
||||
|
||||
|
299
extern/storage-sealing/commit_batch_test.go
vendored
Normal file
299
extern/storage-sealing/commit_batch_test.go
vendored
Normal file
@ -0,0 +1,299 @@
|
||||
package sealing_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"sort"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/golang/mock/gomock"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/go-state-types/big"
|
||||
"github.com/filecoin-project/go-state-types/network"
|
||||
miner5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/miner"
|
||||
proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
||||
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
|
||||
"github.com/filecoin-project/lotus/extern/storage-sealing/mocks"
|
||||
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
|
||||
)
|
||||
|
||||
func TestCommitBatcher(t *testing.T) {
|
||||
t0123, err := address.NewFromString("t0123")
|
||||
require.NoError(t, err)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
as := func(ctx context.Context, mi miner.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) {
|
||||
return t0123, big.Zero(), nil
|
||||
}
|
||||
|
||||
maxBatch := miner5.MaxAggregatedSectors
|
||||
minBatch := miner5.MinAggregatedSectors
|
||||
|
||||
cfg := func() (sealiface.Config, error) {
|
||||
return sealiface.Config{
|
||||
MaxWaitDealsSectors: 2,
|
||||
MaxSealingSectors: 0,
|
||||
MaxSealingSectorsForDeals: 0,
|
||||
WaitDealsDelay: time.Hour * 6,
|
||||
AlwaysKeepUnsealedCopy: true,
|
||||
|
||||
BatchPreCommits: true,
|
||||
MinPreCommitBatch: 1,
|
||||
MaxPreCommitBatch: miner5.PreCommitSectorBatchMaxSize,
|
||||
PreCommitBatchWait: 24 * time.Hour,
|
||||
PreCommitBatchSlack: 3 * time.Hour,
|
||||
|
||||
AggregateCommits: true,
|
||||
MinCommitBatch: minBatch,
|
||||
MaxCommitBatch: maxBatch,
|
||||
CommitBatchWait: 24 * time.Hour,
|
||||
CommitBatchSlack: 1 * time.Hour,
|
||||
|
||||
TerminateBatchMin: 1,
|
||||
TerminateBatchMax: 100,
|
||||
TerminateBatchWait: 5 * time.Minute,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type promise func(t *testing.T)
|
||||
type action func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise
|
||||
|
||||
actions := func(as ...action) action {
|
||||
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
|
||||
var ps []promise
|
||||
for _, a := range as {
|
||||
p := a(t, s, pcb)
|
||||
if p != nil {
|
||||
ps = append(ps, p)
|
||||
}
|
||||
}
|
||||
|
||||
if len(ps) > 0 {
|
||||
return func(t *testing.T) {
|
||||
for _, p := range ps {
|
||||
p(t)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
addSector := func(sn abi.SectorNumber) action {
|
||||
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
|
||||
var pcres sealiface.CommitBatchRes
|
||||
var pcerr error
|
||||
done := sync.Mutex{}
|
||||
done.Lock()
|
||||
|
||||
si := sealing.SectorInfo{
|
||||
SectorNumber: sn,
|
||||
}
|
||||
|
||||
s.EXPECT().ChainHead(gomock.Any()).Return(nil, abi.ChainEpoch(1), nil)
|
||||
s.EXPECT().StateNetworkVersion(gomock.Any(), gomock.Any()).Return(network.Version13, nil)
|
||||
s.EXPECT().StateSectorPreCommitInfo(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&miner.SectorPreCommitOnChainInfo{
|
||||
PreCommitDeposit: big.Zero(),
|
||||
}, nil)
|
||||
|
||||
go func() {
|
||||
defer done.Unlock()
|
||||
pcres, pcerr = pcb.AddCommit(ctx, si, sealing.AggregateInput{
|
||||
Info: proof5.AggregateSealVerifyInfo{
|
||||
Number: sn,
|
||||
},
|
||||
})
|
||||
}()
|
||||
|
||||
return func(t *testing.T) {
|
||||
done.Lock()
|
||||
require.NoError(t, pcerr)
|
||||
require.Empty(t, pcres.Error)
|
||||
require.Contains(t, pcres.Sectors, si.SectorNumber)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
addSectors := func(sectors []abi.SectorNumber) action {
|
||||
as := make([]action, len(sectors))
|
||||
for i, sector := range sectors {
|
||||
as[i] = addSector(sector)
|
||||
}
|
||||
return actions(as...)
|
||||
}
|
||||
|
||||
waitPending := func(n int) action {
|
||||
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
|
||||
require.Eventually(t, func() bool {
|
||||
p, err := pcb.Pending(ctx)
|
||||
require.NoError(t, err)
|
||||
return len(p) == n
|
||||
}, time.Second*5, 10*time.Millisecond)
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
expectSend := func(expect []abi.SectorNumber) action {
|
||||
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
|
||||
s.EXPECT().StateMinerInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(miner.MinerInfo{Owner: t0123, Worker: t0123}, nil)
|
||||
|
||||
ti := len(expect)
|
||||
batch := false
|
||||
if ti >= minBatch {
|
||||
batch = true
|
||||
ti = 1
|
||||
}
|
||||
s.EXPECT().ChainHead(gomock.Any()).Return(nil, abi.ChainEpoch(1), nil)
|
||||
s.EXPECT().StateSectorPreCommitInfo(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&miner.SectorPreCommitOnChainInfo{
|
||||
PreCommitDeposit: big.Zero(),
|
||||
}, nil).Times(len(expect))
|
||||
s.EXPECT().StateMinerInitialPledgeCollateral(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(big.Zero(), nil).Times(len(expect))
|
||||
if batch {
|
||||
s.EXPECT().StateNetworkVersion(gomock.Any(), gomock.Any()).Return(network.Version13, nil)
|
||||
s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(big.NewInt(2000), nil)
|
||||
}
|
||||
|
||||
s.EXPECT().SendMsg(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), funMatcher(func(i interface{}) bool {
|
||||
b := i.([]byte)
|
||||
if batch {
|
||||
var params miner5.ProveCommitAggregateParams
|
||||
require.NoError(t, params.UnmarshalCBOR(bytes.NewReader(b)))
|
||||
for _, number := range expect {
|
||||
set, err := params.SectorNumbers.IsSet(uint64(number))
|
||||
require.NoError(t, err)
|
||||
require.True(t, set)
|
||||
}
|
||||
} else {
|
||||
var params miner5.ProveCommitSectorParams
|
||||
require.NoError(t, params.UnmarshalCBOR(bytes.NewReader(b)))
|
||||
}
|
||||
return true
|
||||
})).Times(ti)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
flush := func(expect []abi.SectorNumber) action {
|
||||
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
|
||||
_ = expectSend(expect)(t, s, pcb)
|
||||
|
||||
batch := len(expect) >= minBatch
|
||||
|
||||
r, err := pcb.Flush(ctx)
|
||||
require.NoError(t, err)
|
||||
if batch {
|
||||
require.Len(t, r, 1)
|
||||
require.Empty(t, r[0].Error)
|
||||
sort.Slice(r[0].Sectors, func(i, j int) bool {
|
||||
return r[0].Sectors[i] < r[0].Sectors[j]
|
||||
})
|
||||
require.Equal(t, expect, r[0].Sectors)
|
||||
} else {
|
||||
require.Len(t, r, len(expect))
|
||||
for _, res := range r {
|
||||
require.Len(t, res.Sectors, 1)
|
||||
require.Empty(t, res.Error)
|
||||
}
|
||||
sort.Slice(r, func(i, j int) bool {
|
||||
return r[i].Sectors[0] < r[j].Sectors[0]
|
||||
})
|
||||
for i, res := range r {
|
||||
require.Equal(t, abi.SectorNumber(i), res.Sectors[0])
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
getSectors := func(n int) []abi.SectorNumber {
|
||||
out := make([]abi.SectorNumber, n)
|
||||
for i := range out {
|
||||
out[i] = abi.SectorNumber(i)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
tcs := map[string]struct {
|
||||
actions []action
|
||||
}{
|
||||
"addSingle": {
|
||||
actions: []action{
|
||||
addSector(0),
|
||||
waitPending(1),
|
||||
flush([]abi.SectorNumber{0}),
|
||||
},
|
||||
},
|
||||
"addTwo": {
|
||||
actions: []action{
|
||||
addSectors(getSectors(2)),
|
||||
waitPending(2),
|
||||
flush(getSectors(2)),
|
||||
},
|
||||
},
|
||||
"addAte": {
|
||||
actions: []action{
|
||||
addSectors(getSectors(8)),
|
||||
waitPending(8),
|
||||
flush(getSectors(8)),
|
||||
},
|
||||
},
|
||||
"addMax": {
|
||||
actions: []action{
|
||||
expectSend(getSectors(maxBatch)),
|
||||
addSectors(getSectors(maxBatch)),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range tcs {
|
||||
tc := tc
|
||||
|
||||
t.Run(name, func(t *testing.T) {
|
||||
// create go mock controller here
|
||||
mockCtrl := gomock.NewController(t)
|
||||
// when test is done, assert expectations on all mock objects.
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
// create them mocks
|
||||
pcapi := mocks.NewMockCommitBatcherApi(mockCtrl)
|
||||
|
||||
pcb := sealing.NewCommitBatcher(ctx, t0123, pcapi, as, fc, cfg, &fakeProver{})
|
||||
|
||||
var promises []promise
|
||||
|
||||
for _, a := range tc.actions {
|
||||
p := a(t, pcapi, pcb)
|
||||
if p != nil {
|
||||
promises = append(promises, p)
|
||||
}
|
||||
}
|
||||
|
||||
for _, p := range promises {
|
||||
p(t)
|
||||
}
|
||||
|
||||
err := pcb.Stop(ctx)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type fakeProver struct{}
|
||||
|
||||
func (f fakeProver) AggregateSealProofs(aggregateInfo proof5.AggregateSealVerifyProofAndInfos, proofs [][]byte) ([]byte, error) {
|
||||
return []byte("Trust me, I'm a proof"), nil
|
||||
}
|
||||
|
||||
var _ ffiwrapper.Prover = &fakeProver{}
|
17
extern/storage-sealing/fsm.go
vendored
17
extern/storage-sealing/fsm.go
vendored
@ -103,6 +103,10 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
|
||||
on(SectorChainPreCommitFailed{}, PreCommitFailed),
|
||||
),
|
||||
Committing: planCommitting,
|
||||
CommitFinalize: planOne(
|
||||
on(SectorFinalized{}, SubmitCommit),
|
||||
on(SectorFinalizeFailed{}, CommitFinalizeFailed),
|
||||
),
|
||||
SubmitCommit: planOne(
|
||||
on(SectorCommitSubmitted{}, CommitWait),
|
||||
on(SectorSubmitCommitAggregate{}, SubmitCommitAggregate),
|
||||
@ -151,6 +155,9 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
|
||||
on(SectorRetryComputeProof{}, Committing),
|
||||
on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed),
|
||||
),
|
||||
CommitFinalizeFailed: planOne(
|
||||
on(SectorRetryFinalize{}, CommitFinalizeFailed),
|
||||
),
|
||||
CommitFailed: planOne(
|
||||
on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed),
|
||||
on(SectorRetryWaitSeed{}, WaitSeed),
|
||||
@ -379,6 +386,8 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
|
||||
fallthrough
|
||||
case CommitWait:
|
||||
return m.handleCommitWait, processed, nil
|
||||
case CommitFinalize:
|
||||
fallthrough
|
||||
case FinalizeSector:
|
||||
return m.handleFinalizeSector, processed, nil
|
||||
|
||||
@ -393,6 +402,8 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
|
||||
return m.handleComputeProofFailed, processed, nil
|
||||
case CommitFailed:
|
||||
return m.handleCommitFailed, processed, nil
|
||||
case CommitFinalizeFailed:
|
||||
fallthrough
|
||||
case FinalizeFailed:
|
||||
return m.handleFinalizeFailed, processed, nil
|
||||
case PackingFailed: // DEPRECATED: remove this for the next reset
|
||||
@ -482,6 +493,9 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) (uint64, err
|
||||
case SectorCommitted: // the normal case
|
||||
e.apply(state)
|
||||
state.State = SubmitCommit
|
||||
case SectorProofReady: // early finalize
|
||||
e.apply(state)
|
||||
state.State = CommitFinalize
|
||||
case SectorSeedReady: // seed changed :/
|
||||
if e.SeedEpoch == state.SeedEpoch && bytes.Equal(e.SeedValue, state.SeedValue) {
|
||||
log.Warnf("planCommitting: got SectorSeedReady, but the seed didn't change")
|
||||
@ -508,6 +522,8 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) (uint64, err
|
||||
}
|
||||
|
||||
func (m *Sealing) restartSectors(ctx context.Context) error {
|
||||
defer m.startupWait.Done()
|
||||
|
||||
trackedSectors, err := m.ListSectors()
|
||||
if err != nil {
|
||||
log.Errorf("loading sector list: %+v", err)
|
||||
@ -525,6 +541,7 @@ func (m *Sealing) restartSectors(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func (m *Sealing) ForceSectorState(ctx context.Context, id abi.SectorNumber, state SectorState) error {
|
||||
m.startupWait.Wait()
|
||||
return m.sectors.Send(id, SectorForceState{state})
|
||||
}
|
||||
|
||||
|
9
extern/storage-sealing/fsm_events.go
vendored
9
extern/storage-sealing/fsm_events.go
vendored
@ -245,6 +245,15 @@ func (evt SectorCommitted) apply(state *SectorInfo) {
|
||||
state.Proof = evt.Proof
|
||||
}
|
||||
|
||||
// like SectorCommitted, but finalizes before sending the proof to the chain
|
||||
type SectorProofReady struct {
|
||||
Proof []byte
|
||||
}
|
||||
|
||||
func (evt SectorProofReady) apply(state *SectorInfo) {
|
||||
state.Proof = evt.Proof
|
||||
}
|
||||
|
||||
type SectorSubmitCommitAggregate struct{}
|
||||
|
||||
func (evt SectorSubmitCommitAggregate) apply(*SectorInfo) {}
|
||||
|
67
extern/storage-sealing/fsm_test.go
vendored
67
extern/storage-sealing/fsm_test.go
vendored
@ -87,6 +87,73 @@ func TestHappyPath(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestHappyPathFinalizeEarly(t *testing.T) {
|
||||
var notif []struct{ before, after SectorInfo }
|
||||
ma, _ := address.NewIDAddress(55151)
|
||||
m := test{
|
||||
s: &Sealing{
|
||||
maddr: ma,
|
||||
stats: SectorStats{
|
||||
bySector: map[abi.SectorID]statSectorState{},
|
||||
},
|
||||
notifee: func(before, after SectorInfo) {
|
||||
notif = append(notif, struct{ before, after SectorInfo }{before, after})
|
||||
},
|
||||
},
|
||||
t: t,
|
||||
state: &SectorInfo{State: Packing},
|
||||
}
|
||||
|
||||
m.planSingle(SectorPacked{})
|
||||
require.Equal(m.t, m.state.State, GetTicket)
|
||||
|
||||
m.planSingle(SectorTicket{})
|
||||
require.Equal(m.t, m.state.State, PreCommit1)
|
||||
|
||||
m.planSingle(SectorPreCommit1{})
|
||||
require.Equal(m.t, m.state.State, PreCommit2)
|
||||
|
||||
m.planSingle(SectorPreCommit2{})
|
||||
require.Equal(m.t, m.state.State, PreCommitting)
|
||||
|
||||
m.planSingle(SectorPreCommitted{})
|
||||
require.Equal(m.t, m.state.State, PreCommitWait)
|
||||
|
||||
m.planSingle(SectorPreCommitLanded{})
|
||||
require.Equal(m.t, m.state.State, WaitSeed)
|
||||
|
||||
m.planSingle(SectorSeedReady{})
|
||||
require.Equal(m.t, m.state.State, Committing)
|
||||
|
||||
m.planSingle(SectorProofReady{})
|
||||
require.Equal(m.t, m.state.State, CommitFinalize)
|
||||
|
||||
m.planSingle(SectorFinalized{})
|
||||
require.Equal(m.t, m.state.State, SubmitCommit)
|
||||
|
||||
m.planSingle(SectorSubmitCommitAggregate{})
|
||||
require.Equal(m.t, m.state.State, SubmitCommitAggregate)
|
||||
|
||||
m.planSingle(SectorCommitAggregateSent{})
|
||||
require.Equal(m.t, m.state.State, CommitWait)
|
||||
|
||||
m.planSingle(SectorProving{})
|
||||
require.Equal(m.t, m.state.State, FinalizeSector)
|
||||
|
||||
m.planSingle(SectorFinalized{})
|
||||
require.Equal(m.t, m.state.State, Proving)
|
||||
|
||||
expected := []SectorState{Packing, GetTicket, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, WaitSeed, Committing, CommitFinalize, SubmitCommit, SubmitCommitAggregate, CommitWait, FinalizeSector, Proving}
|
||||
for i, n := range notif {
|
||||
if n.before.State != expected[i] {
|
||||
t.Fatalf("expected before state: %s, got: %s", expected[i], n.before.State)
|
||||
}
|
||||
if n.after.State != expected[i+1] {
|
||||
t.Fatalf("expected after state: %s, got: %s", expected[i+1], n.after.State)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSeedRevert(t *testing.T) {
|
||||
ma, _ := address.NewIDAddress(55151)
|
||||
m := test{
|
||||
|
2
extern/storage-sealing/garbage.go
vendored
2
extern/storage-sealing/garbage.go
vendored
@ -9,6 +9,8 @@ import (
|
||||
)
|
||||
|
||||
func (m *Sealing) PledgeSector(ctx context.Context) (storage.SectorRef, error) {
|
||||
m.startupWait.Wait()
|
||||
|
||||
m.inputLk.Lock()
|
||||
defer m.inputLk.Unlock()
|
||||
|
||||
|
3
extern/storage-sealing/input.go
vendored
3
extern/storage-sealing/input.go
vendored
@ -395,6 +395,7 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
|
||||
}
|
||||
|
||||
func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSealProof) error {
|
||||
m.startupWait.Wait()
|
||||
if m.creating != nil {
|
||||
return nil // new sector is being created right now
|
||||
}
|
||||
@ -447,7 +448,9 @@ func (m *Sealing) createSector(ctx context.Context, cfg sealiface.Config, sp abi
|
||||
}
|
||||
|
||||
func (m *Sealing) StartPacking(sid abi.SectorNumber) error {
|
||||
m.startupWait.Wait()
|
||||
log.Infow("starting to seal deal sector", "sector", sid, "trigger", "user")
|
||||
|
||||
return m.sectors.Send(uint64(sid), SectorStartPacking{})
|
||||
}
|
||||
|
||||
|
149
extern/storage-sealing/mocks/mock_commit_batcher.go
vendored
Normal file
149
extern/storage-sealing/mocks/mock_commit_batcher.go
vendored
Normal file
@ -0,0 +1,149 @@
|
||||
// Code generated by MockGen. DO NOT EDIT.
|
||||
// Source: github.com/filecoin-project/lotus/extern/storage-sealing (interfaces: CommitBatcherApi)
|
||||
|
||||
// Package mocks is a generated GoMock package.
|
||||
package mocks
|
||||
|
||||
import (
|
||||
context "context"
|
||||
reflect "reflect"
|
||||
|
||||
address "github.com/filecoin-project/go-address"
|
||||
abi "github.com/filecoin-project/go-state-types/abi"
|
||||
big "github.com/filecoin-project/go-state-types/big"
|
||||
network "github.com/filecoin-project/go-state-types/network"
|
||||
miner "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
|
||||
miner0 "github.com/filecoin-project/specs-actors/actors/builtin/miner"
|
||||
gomock "github.com/golang/mock/gomock"
|
||||
cid "github.com/ipfs/go-cid"
|
||||
)
|
||||
|
||||
// MockCommitBatcherApi is a mock of CommitBatcherApi interface.
|
||||
type MockCommitBatcherApi struct {
|
||||
ctrl *gomock.Controller
|
||||
recorder *MockCommitBatcherApiMockRecorder
|
||||
}
|
||||
|
||||
// MockCommitBatcherApiMockRecorder is the mock recorder for MockCommitBatcherApi.
|
||||
type MockCommitBatcherApiMockRecorder struct {
|
||||
mock *MockCommitBatcherApi
|
||||
}
|
||||
|
||||
// NewMockCommitBatcherApi creates a new mock instance.
|
||||
func NewMockCommitBatcherApi(ctrl *gomock.Controller) *MockCommitBatcherApi {
|
||||
mock := &MockCommitBatcherApi{ctrl: ctrl}
|
||||
mock.recorder = &MockCommitBatcherApiMockRecorder{mock}
|
||||
return mock
|
||||
}
|
||||
|
||||
// EXPECT returns an object that allows the caller to indicate expected use.
|
||||
func (m *MockCommitBatcherApi) EXPECT() *MockCommitBatcherApiMockRecorder {
|
||||
return m.recorder
|
||||
}
|
||||
|
||||
// ChainBaseFee mocks base method.
|
||||
func (m *MockCommitBatcherApi) ChainBaseFee(arg0 context.Context, arg1 sealing.TipSetToken) (big.Int, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "ChainBaseFee", arg0, arg1)
|
||||
ret0, _ := ret[0].(big.Int)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// ChainBaseFee indicates an expected call of ChainBaseFee.
|
||||
func (mr *MockCommitBatcherApiMockRecorder) ChainBaseFee(arg0, arg1 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainBaseFee", reflect.TypeOf((*MockCommitBatcherApi)(nil).ChainBaseFee), arg0, arg1)
|
||||
}
|
||||
|
||||
// ChainHead mocks base method.
|
||||
func (m *MockCommitBatcherApi) ChainHead(arg0 context.Context) (sealing.TipSetToken, abi.ChainEpoch, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "ChainHead", arg0)
|
||||
ret0, _ := ret[0].(sealing.TipSetToken)
|
||||
ret1, _ := ret[1].(abi.ChainEpoch)
|
||||
ret2, _ := ret[2].(error)
|
||||
return ret0, ret1, ret2
|
||||
}
|
||||
|
||||
// ChainHead indicates an expected call of ChainHead.
|
||||
func (mr *MockCommitBatcherApiMockRecorder) ChainHead(arg0 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainHead", reflect.TypeOf((*MockCommitBatcherApi)(nil).ChainHead), arg0)
|
||||
}
|
||||
|
||||
// SendMsg mocks base method.
|
||||
func (m *MockCommitBatcherApi) SendMsg(arg0 context.Context, arg1, arg2 address.Address, arg3 abi.MethodNum, arg4, arg5 big.Int, arg6 []byte) (cid.Cid, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "SendMsg", arg0, arg1, arg2, arg3, arg4, arg5, arg6)
|
||||
ret0, _ := ret[0].(cid.Cid)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// SendMsg indicates an expected call of SendMsg.
|
||||
func (mr *MockCommitBatcherApiMockRecorder) SendMsg(arg0, arg1, arg2, arg3, arg4, arg5, arg6 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockCommitBatcherApi)(nil).SendMsg), arg0, arg1, arg2, arg3, arg4, arg5, arg6)
|
||||
}
|
||||
|
||||
// StateMinerInfo mocks base method.
|
||||
func (m *MockCommitBatcherApi) StateMinerInfo(arg0 context.Context, arg1 address.Address, arg2 sealing.TipSetToken) (miner.MinerInfo, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "StateMinerInfo", arg0, arg1, arg2)
|
||||
ret0, _ := ret[0].(miner.MinerInfo)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// StateMinerInfo indicates an expected call of StateMinerInfo.
|
||||
func (mr *MockCommitBatcherApiMockRecorder) StateMinerInfo(arg0, arg1, arg2 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateMinerInfo", reflect.TypeOf((*MockCommitBatcherApi)(nil).StateMinerInfo), arg0, arg1, arg2)
|
||||
}
|
||||
|
||||
// StateMinerInitialPledgeCollateral mocks base method.
|
||||
func (m *MockCommitBatcherApi) StateMinerInitialPledgeCollateral(arg0 context.Context, arg1 address.Address, arg2 miner0.SectorPreCommitInfo, arg3 sealing.TipSetToken) (big.Int, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "StateMinerInitialPledgeCollateral", arg0, arg1, arg2, arg3)
|
||||
ret0, _ := ret[0].(big.Int)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// StateMinerInitialPledgeCollateral indicates an expected call of StateMinerInitialPledgeCollateral.
|
||||
func (mr *MockCommitBatcherApiMockRecorder) StateMinerInitialPledgeCollateral(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateMinerInitialPledgeCollateral", reflect.TypeOf((*MockCommitBatcherApi)(nil).StateMinerInitialPledgeCollateral), arg0, arg1, arg2, arg3)
|
||||
}
|
||||
|
||||
// StateNetworkVersion mocks base method.
|
||||
func (m *MockCommitBatcherApi) StateNetworkVersion(arg0 context.Context, arg1 sealing.TipSetToken) (network.Version, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "StateNetworkVersion", arg0, arg1)
|
||||
ret0, _ := ret[0].(network.Version)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// StateNetworkVersion indicates an expected call of StateNetworkVersion.
|
||||
func (mr *MockCommitBatcherApiMockRecorder) StateNetworkVersion(arg0, arg1 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateNetworkVersion", reflect.TypeOf((*MockCommitBatcherApi)(nil).StateNetworkVersion), arg0, arg1)
|
||||
}
|
||||
|
||||
// StateSectorPreCommitInfo mocks base method.
|
||||
func (m *MockCommitBatcherApi) StateSectorPreCommitInfo(arg0 context.Context, arg1 address.Address, arg2 abi.SectorNumber, arg3 sealing.TipSetToken) (*miner.SectorPreCommitOnChainInfo, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "StateSectorPreCommitInfo", arg0, arg1, arg2, arg3)
|
||||
ret0, _ := ret[0].(*miner.SectorPreCommitOnChainInfo)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// StateSectorPreCommitInfo indicates an expected call of StateSectorPreCommitInfo.
|
||||
func (mr *MockCommitBatcherApiMockRecorder) StateSectorPreCommitInfo(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateSectorPreCommitInfo", reflect.TypeOf((*MockCommitBatcherApi)(nil).StateSectorPreCommitInfo), arg0, arg1, arg2, arg3)
|
||||
}
|
87
extern/storage-sealing/mocks/mock_precommit_batcher.go
vendored
Normal file
87
extern/storage-sealing/mocks/mock_precommit_batcher.go
vendored
Normal file
@ -0,0 +1,87 @@
|
||||
// Code generated by MockGen. DO NOT EDIT.
|
||||
// Source: github.com/filecoin-project/lotus/extern/storage-sealing (interfaces: PreCommitBatcherApi)
|
||||
|
||||
// Package mocks is a generated GoMock package.
|
||||
package mocks
|
||||
|
||||
import (
|
||||
context "context"
|
||||
reflect "reflect"
|
||||
|
||||
address "github.com/filecoin-project/go-address"
|
||||
abi "github.com/filecoin-project/go-state-types/abi"
|
||||
big "github.com/filecoin-project/go-state-types/big"
|
||||
miner "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
|
||||
gomock "github.com/golang/mock/gomock"
|
||||
cid "github.com/ipfs/go-cid"
|
||||
)
|
||||
|
||||
// MockPreCommitBatcherApi is a mock of PreCommitBatcherApi interface.
|
||||
type MockPreCommitBatcherApi struct {
|
||||
ctrl *gomock.Controller
|
||||
recorder *MockPreCommitBatcherApiMockRecorder
|
||||
}
|
||||
|
||||
// MockPreCommitBatcherApiMockRecorder is the mock recorder for MockPreCommitBatcherApi.
|
||||
type MockPreCommitBatcherApiMockRecorder struct {
|
||||
mock *MockPreCommitBatcherApi
|
||||
}
|
||||
|
||||
// NewMockPreCommitBatcherApi creates a new mock instance.
|
||||
func NewMockPreCommitBatcherApi(ctrl *gomock.Controller) *MockPreCommitBatcherApi {
|
||||
mock := &MockPreCommitBatcherApi{ctrl: ctrl}
|
||||
mock.recorder = &MockPreCommitBatcherApiMockRecorder{mock}
|
||||
return mock
|
||||
}
|
||||
|
||||
// EXPECT returns an object that allows the caller to indicate expected use.
|
||||
func (m *MockPreCommitBatcherApi) EXPECT() *MockPreCommitBatcherApiMockRecorder {
|
||||
return m.recorder
|
||||
}
|
||||
|
||||
// ChainHead mocks base method.
|
||||
func (m *MockPreCommitBatcherApi) ChainHead(arg0 context.Context) (sealing.TipSetToken, abi.ChainEpoch, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "ChainHead", arg0)
|
||||
ret0, _ := ret[0].(sealing.TipSetToken)
|
||||
ret1, _ := ret[1].(abi.ChainEpoch)
|
||||
ret2, _ := ret[2].(error)
|
||||
return ret0, ret1, ret2
|
||||
}
|
||||
|
||||
// ChainHead indicates an expected call of ChainHead.
|
||||
func (mr *MockPreCommitBatcherApiMockRecorder) ChainHead(arg0 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainHead", reflect.TypeOf((*MockPreCommitBatcherApi)(nil).ChainHead), arg0)
|
||||
}
|
||||
|
||||
// SendMsg mocks base method.
|
||||
func (m *MockPreCommitBatcherApi) SendMsg(arg0 context.Context, arg1, arg2 address.Address, arg3 abi.MethodNum, arg4, arg5 big.Int, arg6 []byte) (cid.Cid, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "SendMsg", arg0, arg1, arg2, arg3, arg4, arg5, arg6)
|
||||
ret0, _ := ret[0].(cid.Cid)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// SendMsg indicates an expected call of SendMsg.
|
||||
func (mr *MockPreCommitBatcherApiMockRecorder) SendMsg(arg0, arg1, arg2, arg3, arg4, arg5, arg6 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockPreCommitBatcherApi)(nil).SendMsg), arg0, arg1, arg2, arg3, arg4, arg5, arg6)
|
||||
}
|
||||
|
||||
// StateMinerInfo mocks base method.
|
||||
func (m *MockPreCommitBatcherApi) StateMinerInfo(arg0 context.Context, arg1 address.Address, arg2 sealing.TipSetToken) (miner.MinerInfo, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "StateMinerInfo", arg0, arg1, arg2)
|
||||
ret0, _ := ret[0].(miner.MinerInfo)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// StateMinerInfo indicates an expected call of StateMinerInfo.
|
||||
func (mr *MockPreCommitBatcherApiMockRecorder) StateMinerInfo(arg0, arg1, arg2 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateMinerInfo", reflect.TypeOf((*MockPreCommitBatcherApi)(nil).StateMinerInfo), arg0, arg1, arg2)
|
||||
}
|
4
extern/storage-sealing/precommit_batch.go
vendored
4
extern/storage-sealing/precommit_batch.go
vendored
@ -25,6 +25,8 @@ import (
|
||||
"github.com/filecoin-project/lotus/node/config"
|
||||
)
|
||||
|
||||
//go:generate go run github.com/golang/mock/mockgen -destination=mocks/mock_precommit_batcher.go -package=mocks . PreCommitBatcherApi
|
||||
|
||||
type PreCommitBatcherApi interface {
|
||||
SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error)
|
||||
StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error)
|
||||
@ -243,7 +245,7 @@ func (b *PreCommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.PreCo
|
||||
|
||||
res.Msg = &mcid
|
||||
|
||||
log.Infow("Sent ProveCommitAggregate message", "cid", mcid, "from", from, "sectors", len(b.todo))
|
||||
log.Infow("Sent PreCommitSectorBatch message", "cid", mcid, "from", from, "sectors", len(b.todo))
|
||||
|
||||
return []sealiface.PreCommitBatchRes{res}, nil
|
||||
}
|
||||
|
258
extern/storage-sealing/precommit_batch_test.go
vendored
Normal file
258
extern/storage-sealing/precommit_batch_test.go
vendored
Normal file
@ -0,0 +1,258 @@
|
||||
package sealing_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"sort"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/golang/mock/gomock"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/go-state-types/big"
|
||||
miner5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/miner"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
|
||||
"github.com/filecoin-project/lotus/extern/storage-sealing/mocks"
|
||||
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
|
||||
"github.com/filecoin-project/lotus/node/config"
|
||||
miner0 "github.com/filecoin-project/specs-actors/actors/builtin/miner"
|
||||
)
|
||||
|
||||
var fc = config.MinerFeeConfig{
|
||||
MaxPreCommitGasFee: types.FIL(types.FromFil(1)),
|
||||
MaxCommitGasFee: types.FIL(types.FromFil(1)),
|
||||
MaxTerminateGasFee: types.FIL(types.FromFil(1)),
|
||||
MaxPreCommitBatchGasFee: config.BatchFeeConfig{Base: types.FIL(types.FromFil(3)), PerSector: types.FIL(types.FromFil(1))},
|
||||
MaxCommitBatchGasFee: config.BatchFeeConfig{Base: types.FIL(types.FromFil(3)), PerSector: types.FIL(types.FromFil(1))},
|
||||
}
|
||||
|
||||
func TestPrecommitBatcher(t *testing.T) {
|
||||
t0123, err := address.NewFromString("t0123")
|
||||
require.NoError(t, err)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
as := func(ctx context.Context, mi miner.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) {
|
||||
return t0123, big.Zero(), nil
|
||||
}
|
||||
|
||||
maxBatch := miner5.PreCommitSectorBatchMaxSize
|
||||
|
||||
cfg := func() (sealiface.Config, error) {
|
||||
return sealiface.Config{
|
||||
MaxWaitDealsSectors: 2,
|
||||
MaxSealingSectors: 0,
|
||||
MaxSealingSectorsForDeals: 0,
|
||||
WaitDealsDelay: time.Hour * 6,
|
||||
AlwaysKeepUnsealedCopy: true,
|
||||
|
||||
BatchPreCommits: true,
|
||||
MinPreCommitBatch: 1,
|
||||
MaxPreCommitBatch: maxBatch,
|
||||
PreCommitBatchWait: 24 * time.Hour,
|
||||
PreCommitBatchSlack: 3 * time.Hour,
|
||||
|
||||
AggregateCommits: true,
|
||||
MinCommitBatch: miner5.MinAggregatedSectors,
|
||||
MaxCommitBatch: miner5.MaxAggregatedSectors,
|
||||
CommitBatchWait: 24 * time.Hour,
|
||||
CommitBatchSlack: 1 * time.Hour,
|
||||
|
||||
TerminateBatchMin: 1,
|
||||
TerminateBatchMax: 100,
|
||||
TerminateBatchWait: 5 * time.Minute,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type promise func(t *testing.T)
|
||||
type action func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *sealing.PreCommitBatcher) promise
|
||||
|
||||
actions := func(as ...action) action {
|
||||
return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *sealing.PreCommitBatcher) promise {
|
||||
var ps []promise
|
||||
for _, a := range as {
|
||||
p := a(t, s, pcb)
|
||||
if p != nil {
|
||||
ps = append(ps, p)
|
||||
}
|
||||
}
|
||||
|
||||
if len(ps) > 0 {
|
||||
return func(t *testing.T) {
|
||||
for _, p := range ps {
|
||||
p(t)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
addSector := func(sn abi.SectorNumber) action {
|
||||
return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *sealing.PreCommitBatcher) promise {
|
||||
var pcres sealiface.PreCommitBatchRes
|
||||
var pcerr error
|
||||
done := sync.Mutex{}
|
||||
done.Lock()
|
||||
|
||||
si := sealing.SectorInfo{
|
||||
SectorNumber: sn,
|
||||
}
|
||||
|
||||
s.EXPECT().ChainHead(gomock.Any()).Return(nil, abi.ChainEpoch(1), nil)
|
||||
|
||||
go func() {
|
||||
defer done.Unlock()
|
||||
pcres, pcerr = pcb.AddPreCommit(ctx, si, big.Zero(), &miner0.SectorPreCommitInfo{
|
||||
SectorNumber: si.SectorNumber,
|
||||
SealedCID: fakePieceCid(t),
|
||||
DealIDs: nil,
|
||||
Expiration: 0,
|
||||
})
|
||||
}()
|
||||
|
||||
return func(t *testing.T) {
|
||||
done.Lock()
|
||||
require.NoError(t, pcerr)
|
||||
require.Empty(t, pcres.Error)
|
||||
require.Contains(t, pcres.Sectors, si.SectorNumber)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
addSectors := func(sectors []abi.SectorNumber) action {
|
||||
as := make([]action, len(sectors))
|
||||
for i, sector := range sectors {
|
||||
as[i] = addSector(sector)
|
||||
}
|
||||
return actions(as...)
|
||||
}
|
||||
|
||||
waitPending := func(n int) action {
|
||||
return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *sealing.PreCommitBatcher) promise {
|
||||
require.Eventually(t, func() bool {
|
||||
p, err := pcb.Pending(ctx)
|
||||
require.NoError(t, err)
|
||||
return len(p) == n
|
||||
}, time.Second*5, 10*time.Millisecond)
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
expectSend := func(expect []abi.SectorNumber) action {
|
||||
return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *sealing.PreCommitBatcher) promise {
|
||||
s.EXPECT().StateMinerInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(miner.MinerInfo{Owner: t0123, Worker: t0123}, nil)
|
||||
s.EXPECT().SendMsg(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), funMatcher(func(i interface{}) bool {
|
||||
b := i.([]byte)
|
||||
var params miner5.PreCommitSectorBatchParams
|
||||
require.NoError(t, params.UnmarshalCBOR(bytes.NewReader(b)))
|
||||
for s, number := range expect {
|
||||
require.Equal(t, number, params.Sectors[s].SectorNumber)
|
||||
}
|
||||
return true
|
||||
}))
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
flush := func(expect []abi.SectorNumber) action {
|
||||
return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *sealing.PreCommitBatcher) promise {
|
||||
_ = expectSend(expect)(t, s, pcb)
|
||||
|
||||
r, err := pcb.Flush(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, r, 1)
|
||||
require.Empty(t, r[0].Error)
|
||||
sort.Slice(r[0].Sectors, func(i, j int) bool {
|
||||
return r[0].Sectors[i] < r[0].Sectors[j]
|
||||
})
|
||||
require.Equal(t, expect, r[0].Sectors)
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
getSectors := func(n int) []abi.SectorNumber {
|
||||
out := make([]abi.SectorNumber, n)
|
||||
for i := range out {
|
||||
out[i] = abi.SectorNumber(i)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
tcs := map[string]struct {
|
||||
actions []action
|
||||
}{
|
||||
"addSingle": {
|
||||
actions: []action{
|
||||
addSector(0),
|
||||
waitPending(1),
|
||||
flush([]abi.SectorNumber{0}),
|
||||
},
|
||||
},
|
||||
"addTwo": {
|
||||
actions: []action{
|
||||
addSectors(getSectors(2)),
|
||||
waitPending(2),
|
||||
flush(getSectors(2)),
|
||||
},
|
||||
},
|
||||
"addMax": {
|
||||
actions: []action{
|
||||
expectSend(getSectors(maxBatch)),
|
||||
addSectors(getSectors(maxBatch)),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range tcs {
|
||||
tc := tc
|
||||
|
||||
t.Run(name, func(t *testing.T) {
|
||||
// create go mock controller here
|
||||
mockCtrl := gomock.NewController(t)
|
||||
// when test is done, assert expectations on all mock objects.
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
// create them mocks
|
||||
pcapi := mocks.NewMockPreCommitBatcherApi(mockCtrl)
|
||||
|
||||
pcb := sealing.NewPreCommitBatcher(ctx, t0123, pcapi, as, fc, cfg)
|
||||
|
||||
var promises []promise
|
||||
|
||||
for _, a := range tc.actions {
|
||||
p := a(t, pcapi, pcb)
|
||||
if p != nil {
|
||||
promises = append(promises, p)
|
||||
}
|
||||
}
|
||||
|
||||
for _, p := range promises {
|
||||
p(t)
|
||||
}
|
||||
|
||||
err := pcb.Stop(ctx)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type funMatcher func(interface{}) bool
|
||||
|
||||
func (funMatcher) Matches(interface{}) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (funMatcher) String() string {
|
||||
return "fun"
|
||||
}
|
2
extern/storage-sealing/sealiface/config.go
vendored
2
extern/storage-sealing/sealiface/config.go
vendored
@ -18,6 +18,8 @@ type Config struct {
|
||||
|
||||
AlwaysKeepUnsealedCopy bool
|
||||
|
||||
FinalizeEarly bool
|
||||
|
||||
BatchPreCommits bool
|
||||
MaxPreCommitBatch int
|
||||
MinPreCommitBatch int
|
||||
|
7
extern/storage-sealing/sealing.go
vendored
7
extern/storage-sealing/sealing.go
vendored
@ -83,6 +83,8 @@ type Sealing struct {
|
||||
feeCfg config.MinerFeeConfig
|
||||
events Events
|
||||
|
||||
startupWait sync.WaitGroup
|
||||
|
||||
maddr address.Address
|
||||
|
||||
sealer sectorstorage.SectorManager
|
||||
@ -162,6 +164,7 @@ func New(api SealingAPI, fc config.MinerFeeConfig, events Events, maddr address.
|
||||
bySector: map[abi.SectorID]statSectorState{},
|
||||
},
|
||||
}
|
||||
s.startupWait.Add(1)
|
||||
|
||||
s.sectors = statemachine.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), s, SectorInfo{})
|
||||
|
||||
@ -189,10 +192,14 @@ func (m *Sealing) Stop(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func (m *Sealing) Remove(ctx context.Context, sid abi.SectorNumber) error {
|
||||
m.startupWait.Wait()
|
||||
|
||||
return m.sectors.Send(uint64(sid), SectorRemove{})
|
||||
}
|
||||
|
||||
func (m *Sealing) Terminate(ctx context.Context, sid abi.SectorNumber) error {
|
||||
m.startupWait.Wait()
|
||||
|
||||
return m.sectors.Send(uint64(sid), SectorTerminate{})
|
||||
}
|
||||
|
||||
|
10
extern/storage-sealing/sector_state.go
vendored
10
extern/storage-sealing/sector_state.go
vendored
@ -17,6 +17,8 @@ var ExistSectorStateList = map[SectorState]struct{}{
|
||||
PreCommitBatchWait: {},
|
||||
WaitSeed: {},
|
||||
Committing: {},
|
||||
CommitFinalize: {},
|
||||
CommitFinalizeFailed: {},
|
||||
SubmitCommit: {},
|
||||
CommitWait: {},
|
||||
SubmitCommitAggregate: {},
|
||||
@ -63,8 +65,10 @@ const (
|
||||
SubmitPreCommitBatch SectorState = "SubmitPreCommitBatch"
|
||||
PreCommitBatchWait SectorState = "PreCommitBatchWait"
|
||||
|
||||
WaitSeed SectorState = "WaitSeed" // waiting for seed
|
||||
Committing SectorState = "Committing" // compute PoRep
|
||||
WaitSeed SectorState = "WaitSeed" // waiting for seed
|
||||
Committing SectorState = "Committing" // compute PoRep
|
||||
CommitFinalize SectorState = "CommitFinalize" // cleanup sector metadata before submitting the proof (early finalize)
|
||||
CommitFinalizeFailed SectorState = "CommitFinalizeFailed"
|
||||
|
||||
// single commit
|
||||
SubmitCommit SectorState = "SubmitCommit" // send commit message to the chain
|
||||
@ -106,7 +110,7 @@ func toStatState(st SectorState) statSectorState {
|
||||
switch st {
|
||||
case UndefinedSectorState, Empty, WaitDeals, AddPiece:
|
||||
return sstStaging
|
||||
case Packing, GetTicket, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, SubmitPreCommitBatch, PreCommitBatchWait, WaitSeed, Committing, SubmitCommit, CommitWait, SubmitCommitAggregate, CommitAggregateWait, FinalizeSector:
|
||||
case Packing, GetTicket, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, SubmitPreCommitBatch, PreCommitBatchWait, WaitSeed, Committing, CommitFinalize, SubmitCommit, CommitWait, SubmitCommitAggregate, CommitAggregateWait, FinalizeSector:
|
||||
return sstSealing
|
||||
case Proving, Removed, Removing, Terminating, TerminateWait, TerminateFinality, TerminateFailed:
|
||||
return sstProving
|
||||
|
31
extern/storage-sealing/states_sealing.go
vendored
31
extern/storage-sealing/states_sealing.go
vendored
@ -478,6 +478,11 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo)
|
||||
}
|
||||
}
|
||||
|
||||
cfg, err := m.getConfig()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting config: %w", err)
|
||||
}
|
||||
|
||||
log.Info("scheduling seal proof computation...")
|
||||
|
||||
log.Infof("KOMIT %d %x(%d); %x(%d); %v; r:%x; d:%x", sector.SectorNumber, sector.TicketValue, sector.TicketEpoch, sector.SeedValue, sector.SeedEpoch, sector.pieceInfos(), sector.CommR, sector.CommD)
|
||||
@ -500,6 +505,24 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo)
|
||||
return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed(2): %w", err)})
|
||||
}
|
||||
|
||||
{
|
||||
tok, _, err := m.api.ChainHead(ctx.Context())
|
||||
if err != nil {
|
||||
log.Errorf("handleCommitting: api error, not proceeding: %+v", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := m.checkCommit(ctx.Context(), sector, proof, tok); err != nil {
|
||||
return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("commit check error: %w", err)})
|
||||
}
|
||||
}
|
||||
|
||||
if cfg.FinalizeEarly {
|
||||
return ctx.Send(SectorProofReady{
|
||||
Proof: proof,
|
||||
})
|
||||
}
|
||||
|
||||
return ctx.Send(SectorCommitted{
|
||||
Proof: proof,
|
||||
})
|
||||
@ -524,7 +547,7 @@ func (m *Sealing) handleSubmitCommit(ctx statemachine.Context, sector SectorInfo
|
||||
|
||||
tok, _, err := m.api.ChainHead(ctx.Context())
|
||||
if err != nil {
|
||||
log.Errorf("handleCommitting: api error, not proceeding: %+v", err)
|
||||
log.Errorf("handleSubmitCommit: api error, not proceeding: %+v", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -590,15 +613,15 @@ func (m *Sealing) handleSubmitCommitAggregate(ctx statemachine.Context, sector S
|
||||
}
|
||||
|
||||
res, err := m.commiter.AddCommit(ctx.Context(), sector, AggregateInput{
|
||||
info: proof.AggregateSealVerifyInfo{
|
||||
Info: proof.AggregateSealVerifyInfo{
|
||||
Number: sector.SectorNumber,
|
||||
Randomness: sector.TicketValue,
|
||||
InteractiveRandomness: sector.SeedValue,
|
||||
SealedCID: *sector.CommR,
|
||||
UnsealedCID: *sector.CommD,
|
||||
},
|
||||
proof: sector.Proof, // todo: this correct??
|
||||
spt: sector.SectorType,
|
||||
Proof: sector.Proof, // todo: this correct??
|
||||
Spt: sector.SectorType,
|
||||
})
|
||||
if err != nil {
|
||||
return ctx.Send(SectorCommitFailed{xerrors.Errorf("queuing commit for aggregation failed: %w", err)})
|
||||
|
2
go.mod
2
go.mod
@ -39,7 +39,7 @@ require (
|
||||
github.com/filecoin-project/go-jsonrpc v0.1.4-0.20210217175800-45ea43ac2bec
|
||||
github.com/filecoin-project/go-multistore v0.0.3
|
||||
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20
|
||||
github.com/filecoin-project/go-paramfetch v0.0.2-0.20210330140417-936748d3f5ec
|
||||
github.com/filecoin-project/go-paramfetch v0.0.2-0.20210614165157-25a6c7769498
|
||||
github.com/filecoin-project/go-state-types v0.1.1-0.20210506134452-99b279731c48
|
||||
github.com/filecoin-project/go-statemachine v0.0.0-20200925024713-05bd7c71fbfe
|
||||
github.com/filecoin-project/go-statestore v0.1.1
|
||||
|
6
go.sum
6
go.sum
@ -298,8 +298,8 @@ github.com/filecoin-project/go-multistore v0.0.3 h1:vaRBY4YiA2UZFPK57RNuewypB8u0
|
||||
github.com/filecoin-project/go-multistore v0.0.3/go.mod h1:kaNqCC4IhU4B1uyr7YWFHd23TL4KM32aChS0jNkyUvQ=
|
||||
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20 h1:+/4aUeUoKr6AKfPE3mBhXA5spIV6UcKdTYDPNU2Tdmg=
|
||||
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20/go.mod h1:mPn+LRRd5gEKNAtc+r3ScpW2JRU/pj4NBKdADYWHiak=
|
||||
github.com/filecoin-project/go-paramfetch v0.0.2-0.20210330140417-936748d3f5ec h1:gExwWUiT1TcARkxGneS4nvp9C+wBsKU0bFdg7qFpNco=
|
||||
github.com/filecoin-project/go-paramfetch v0.0.2-0.20210330140417-936748d3f5ec/go.mod h1:fZzmf4tftbwf9S37XRifoJlz7nCjRdIrMGLR07dKLCc=
|
||||
github.com/filecoin-project/go-paramfetch v0.0.2-0.20210614165157-25a6c7769498 h1:G10ezOvpH1CLXQ19EA9VWNwyL0mg536ujSayjV0yg0k=
|
||||
github.com/filecoin-project/go-paramfetch v0.0.2-0.20210614165157-25a6c7769498/go.mod h1:1FH85P8U+DUEmWk1Jkw3Bw7FrwTVUNHk/95PSPG+dts=
|
||||
github.com/filecoin-project/go-state-types v0.0.0-20200903145444-247639ffa6ad/go.mod h1:IQ0MBPnonv35CJHtWSN3YY1Hz2gkPru1Q9qoaYLxx9I=
|
||||
github.com/filecoin-project/go-state-types v0.0.0-20200904021452-1883f36ca2f4/go.mod h1:IQ0MBPnonv35CJHtWSN3YY1Hz2gkPru1Q9qoaYLxx9I=
|
||||
github.com/filecoin-project/go-state-types v0.0.0-20200928172055-2df22083d8ab/go.mod h1:ezYnPf0bNkTsDibL/psSz5dy4B5awOJ/E7P2Saeep8g=
|
||||
@ -705,6 +705,8 @@ github.com/ipfs/go-log/v2 v2.1.2-0.20200626104915-0016c0b4b3e4/go.mod h1:2v2nsGf
|
||||
github.com/ipfs/go-log/v2 v2.1.2/go.mod h1:2v2nsGfZsvvAJz13SyFzf9ObaqwHiHxsPLEHntrv9KM=
|
||||
github.com/ipfs/go-log/v2 v2.1.3 h1:1iS3IU7aXRlbgUpN8yTTpJ53NXYjAe37vcI5+5nYrzk=
|
||||
github.com/ipfs/go-log/v2 v2.1.3/go.mod h1:/8d0SH3Su5Ooc31QlL1WysJhvyOTDCjcCZ9Axpmri6g=
|
||||
github.com/ipfs/go-log/v2 v2.1.3 h1:1iS3IU7aXRlbgUpN8yTTpJ53NXYjAe37vcI5+5nYrzk=
|
||||
github.com/ipfs/go-log/v2 v2.1.3/go.mod h1:/8d0SH3Su5Ooc31QlL1WysJhvyOTDCjcCZ9Axpmri6g=
|
||||
github.com/ipfs/go-merkledag v0.0.3/go.mod h1:Oc5kIXLHokkE1hWGMBHw+oxehkAaTOqtEb7Zbh6BhLA=
|
||||
github.com/ipfs/go-merkledag v0.0.6/go.mod h1:QYPdnlvkOg7GnQRofu9XZimC5ZW5Wi3bKys/4GQQfto=
|
||||
github.com/ipfs/go-merkledag v0.2.3/go.mod h1:SQiXrtSts3KGNmgOzMICy5c0POOpUNQLvB3ClKnBAlk=
|
||||
|
@ -38,6 +38,7 @@ var (
|
||||
MessageTo, _ = tag.NewKey("message_to")
|
||||
MessageNonce, _ = tag.NewKey("message_nonce")
|
||||
ReceivedFrom, _ = tag.NewKey("received_from")
|
||||
MsgValid, _ = tag.NewKey("message_valid")
|
||||
Endpoint, _ = tag.NewKey("endpoint")
|
||||
APIInterface, _ = tag.NewKey("api") // to distinguish between gateway api and full node api endpoint calls
|
||||
|
||||
@ -61,6 +62,12 @@ var (
|
||||
MessageReceived = stats.Int64("message/received", "Counter for total received messages", stats.UnitDimensionless)
|
||||
MessageValidationFailure = stats.Int64("message/failure", "Counter for message validation failures", stats.UnitDimensionless)
|
||||
MessageValidationSuccess = stats.Int64("message/success", "Counter for message validation successes", stats.UnitDimensionless)
|
||||
MessageValidationDuration = stats.Float64("message/validation_ms", "Duration of message validation", stats.UnitMilliseconds)
|
||||
MpoolGetNonceDuration = stats.Float64("mpool/getnonce_ms", "Duration of getStateNonce in mpool", stats.UnitMilliseconds)
|
||||
MpoolGetBalanceDuration = stats.Float64("mpool/getbalance_ms", "Duration of getStateBalance in mpool", stats.UnitMilliseconds)
|
||||
MpoolAddTsDuration = stats.Float64("mpool/addts_ms", "Duration of addTs in mpool", stats.UnitMilliseconds)
|
||||
MpoolAddDuration = stats.Float64("mpool/add_ms", "Duration of Add in mpool", stats.UnitMilliseconds)
|
||||
MpoolPushDuration = stats.Float64("mpool/push_ms", "Duration of Push in mpool", stats.UnitMilliseconds)
|
||||
BlockPublished = stats.Int64("block/published", "Counter for total locally published blocks", stats.UnitDimensionless)
|
||||
BlockReceived = stats.Int64("block/received", "Counter for total received blocks", stats.UnitDimensionless)
|
||||
BlockValidationFailure = stats.Int64("block/failure", "Counter for block validation failures", stats.UnitDimensionless)
|
||||
@ -170,6 +177,31 @@ var (
|
||||
Measure: MessageValidationSuccess,
|
||||
Aggregation: view.Count(),
|
||||
}
|
||||
MessageValidationDurationView = &view.View{
|
||||
Measure: MessageValidationDuration,
|
||||
Aggregation: defaultMillisecondsDistribution,
|
||||
TagKeys: []tag.Key{MsgValid, Local},
|
||||
}
|
||||
MpoolGetNonceDurationView = &view.View{
|
||||
Measure: MpoolGetNonceDuration,
|
||||
Aggregation: defaultMillisecondsDistribution,
|
||||
}
|
||||
MpoolGetBalanceDurationView = &view.View{
|
||||
Measure: MpoolGetBalanceDuration,
|
||||
Aggregation: defaultMillisecondsDistribution,
|
||||
}
|
||||
MpoolAddTsDurationView = &view.View{
|
||||
Measure: MpoolAddTsDuration,
|
||||
Aggregation: defaultMillisecondsDistribution,
|
||||
}
|
||||
MpoolAddDurationView = &view.View{
|
||||
Measure: MpoolAddDuration,
|
||||
Aggregation: defaultMillisecondsDistribution,
|
||||
}
|
||||
MpoolPushDurationView = &view.View{
|
||||
Measure: MpoolPushDuration,
|
||||
Aggregation: defaultMillisecondsDistribution,
|
||||
}
|
||||
PeerCountView = &view.View{
|
||||
Measure: PeerCount,
|
||||
Aggregation: view.LastValue(),
|
||||
@ -313,6 +345,12 @@ var ChainNodeViews = append([]*view.View{
|
||||
MessageReceivedView,
|
||||
MessageValidationFailureView,
|
||||
MessageValidationSuccessView,
|
||||
MessageValidationDurationView,
|
||||
MpoolGetNonceDurationView,
|
||||
MpoolGetBalanceDurationView,
|
||||
MpoolAddTsDurationView,
|
||||
MpoolAddDurationView,
|
||||
MpoolPushDurationView,
|
||||
PubsubPublishMessageView,
|
||||
PubsubDeliverMessageView,
|
||||
PubsubRejectMessageView,
|
||||
|
@ -97,6 +97,9 @@ type SealingConfig struct {
|
||||
|
||||
AlwaysKeepUnsealedCopy bool
|
||||
|
||||
// Run sector finalization before submitting sector proof to the chain
|
||||
FinalizeEarly bool
|
||||
|
||||
// enable / disable precommit batching (takes effect after nv13)
|
||||
BatchPreCommits bool
|
||||
// maximum precommit batch size - batches will be sent immediately above this size
|
||||
@ -290,6 +293,7 @@ func DefaultStorageMiner() *StorageMiner {
|
||||
MaxSealingSectorsForDeals: 0,
|
||||
WaitDealsDelay: Duration(time.Hour * 6),
|
||||
AlwaysKeepUnsealedCopy: true,
|
||||
FinalizeEarly: false,
|
||||
|
||||
BatchPreCommits: true,
|
||||
MinPreCommitBatch: 1, // we must have at least one precommit to batch
|
||||
|
@ -226,15 +226,15 @@ func (a *MpoolAPI) MpoolBatchPushMessage(ctx context.Context, msgs []*types.Mess
|
||||
}
|
||||
|
||||
func (a *MpoolAPI) MpoolCheckMessages(ctx context.Context, protos []*api.MessagePrototype) ([][]api.MessageCheckStatus, error) {
|
||||
return a.Mpool.CheckMessages(protos)
|
||||
return a.Mpool.CheckMessages(ctx, protos)
|
||||
}
|
||||
|
||||
func (a *MpoolAPI) MpoolCheckPendingMessages(ctx context.Context, from address.Address) ([][]api.MessageCheckStatus, error) {
|
||||
return a.Mpool.CheckPendingMessages(from)
|
||||
return a.Mpool.CheckPendingMessages(ctx, from)
|
||||
}
|
||||
|
||||
func (a *MpoolAPI) MpoolCheckReplaceMessages(ctx context.Context, msgs []*types.Message) ([][]api.MessageCheckStatus, error) {
|
||||
return a.Mpool.CheckReplaceMessages(msgs)
|
||||
return a.Mpool.CheckReplaceMessages(ctx, msgs)
|
||||
}
|
||||
|
||||
func (a *MpoolAPI) MpoolGetNonce(ctx context.Context, addr address.Address) (uint64, error) {
|
||||
|
@ -843,6 +843,7 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error
|
||||
MaxSealingSectorsForDeals: cfg.MaxSealingSectorsForDeals,
|
||||
WaitDealsDelay: config.Duration(cfg.WaitDealsDelay),
|
||||
AlwaysKeepUnsealedCopy: cfg.AlwaysKeepUnsealedCopy,
|
||||
FinalizeEarly: cfg.FinalizeEarly,
|
||||
|
||||
BatchPreCommits: cfg.BatchPreCommits,
|
||||
MinPreCommitBatch: cfg.MinPreCommitBatch,
|
||||
@ -874,6 +875,7 @@ func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error
|
||||
MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals,
|
||||
WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay),
|
||||
AlwaysKeepUnsealedCopy: cfg.Sealing.AlwaysKeepUnsealedCopy,
|
||||
FinalizeEarly: cfg.Sealing.FinalizeEarly,
|
||||
|
||||
BatchPreCommits: cfg.Sealing.BatchPreCommits,
|
||||
MinPreCommitBatch: cfg.Sealing.MinPreCommitBatch,
|
||||
|
Loading…
Reference in New Issue
Block a user