Merge branch 'master' into raulk/itests-refactor-kit

This commit is contained in:
Raúl Kripalani 2021-06-16 11:02:16 +01:00
commit 305e3245b6
43 changed files with 1909 additions and 285 deletions

View File

@ -133,6 +133,9 @@ jobs:
deadline-test:
type: string
default: "0"
proofs-log-test:
type: string
default: "0"
test-suite-name:
type: string
default: unit
@ -167,6 +170,7 @@ jobs:
environment:
LOTUS_TEST_WINDOW_POST: << parameters.winpost-test >>
LOTUS_TEST_DEADLINE_TOGGLING: << parameters.deadline-test >>
TEST_RUSTPROOFS_LOGS: << parameters.proofs-log-test >>
SKIP_CONFORMANCE: "1"
command: |
mkdir -p /tmp/test-reports/<< parameters.test-suite-name >>
@ -212,6 +216,8 @@ jobs:
<<: *test
test-terminate:
<<: *test
check-proofs-multicore-sdr:
<<: *test
test-conformance:
description: |
Run tests using a corpus of interoperable test vectors for Filecoin
@ -815,6 +821,12 @@ workflows:
tags:
only:
- /^v\d+\.\d+\.\d+(-rc\d+)?$/
- check-proofs-multicore-sdr:
codecov-upload: true
go-test-flags: "-run=TestMulticoreSDR"
test-suite-name: multicore-sdr-check
packages: "./extern/sector-storage/ffiwrapper"
proofs-log-test: "1"
- test-conformance:
test-suite-name: conformance
packages: "./conformance"

View File

@ -70,143 +70,6 @@ This is an optional Lotus release that introduces various improvements to the se
- fix health report (https://github.com/filecoin-project/lotus/pull/6011)
- fix(ci): Use recent ubuntu LTS release; Update release params ((https://github.com/filecoin-project/lotus/pull/6011))
# 1.9.0-rc4 / 2021-05-13
This is an optional Lotus release that introduces various improvements to the sealing, mining, and deal-making processes.
## Highlights
- OpenRPC Support (https://github.com/filecoin-project/lotus/pull/5843)
- Take latency into account when making interactive deals (https://github.com/filecoin-project/lotus/pull/5876)
- Update go-commp-utils for >10x faster client commp calculation (https://github.com/filecoin-project/lotus/pull/5892)
- add `lotus client cancel-retrieval` cmd to lotus CLI (https://github.com/filecoin-project/lotus/pull/5871)
- add `inspect-deal` command to `lotus client` (https://github.com/filecoin-project/lotus/pull/5833)
- Local retrieval support (https://github.com/filecoin-project/lotus/pull/5917)
- go-fil-markets v1.1.9 -> v1.2.5
- For a detailed changelog see https://github.com/filecoin-project/go-fil-markets/blob/master/CHANGELOG.md
- rust-fil-proofs v5.4.1 -> v7.0.1
- For a detailed changelog see https://github.com/filecoin-project/rust-fil-proofs/blob/master/CHANGELOG.md
## Changes
- storagefsm: Apply global events even in broken states (https://github.com/filecoin-project/lotus/pull/5962)
- Default the AlwaysKeepUnsealedCopy flag to true (https://github.com/filecoin-project/lotus/pull/5743)
- splitstore: compact hotstore prior to garbage collection (https://github.com/filecoin-project/lotus/pull/5778)
- ipfs-force bootstrapper update (https://github.com/filecoin-project/lotus/pull/5799)
- better logging when unsealing fails (https://github.com/filecoin-project/lotus/pull/5851)
- perf: add cache for gas permium estimation (https://github.com/filecoin-project/lotus/pull/5709)
- backupds: Compact log on restart (https://github.com/filecoin-project/lotus/pull/5875)
- backupds: Improve truncated log handling (https://github.com/filecoin-project/lotus/pull/5891)
- State CLI improvements (State CLI improvements)
- API proxy struct codegen (https://github.com/filecoin-project/lotus/pull/5854)
- move DI stuff for paychmgr into modules (https://github.com/filecoin-project/lotus/pull/5791)
- Implement Event observer and Settings for 3rd party dep injection (https://github.com/filecoin-project/lotus/pull/5693)
- Export developer and network commands for consumption by derivatives of Lotus (https://github.com/filecoin-project/lotus/pull/5864)
- mock sealer: Simulate randomness sideeffects (https://github.com/filecoin-project/lotus/pull/5805)
- localstorage: Demote reservation stat error to debug (https://github.com/filecoin-project/lotus/pull/5976)
- shed command to unpack miner info dumps (https://github.com/filecoin-project/lotus/pull/5800)
- Add two utils to Lotus-shed (https://github.com/filecoin-project/lotus/pull/5867)
- add shed election estimate command (https://github.com/filecoin-project/lotus/pull/5092)
- Add --actor flag in lotus-shed sectors terminate (https://github.com/filecoin-project/lotus/pull/5819)
- Move lotus mpool clear to lotus-shed (https://github.com/filecoin-project/lotus/pull/5900)
- Centralize everything on ipfs/go-log/v2 (https://github.com/filecoin-project/lotus/pull/5974)
- expose NextID from nice market actor interface (https://github.com/filecoin-project/lotus/pull/5850)
- add available options for perm on error (https://github.com/filecoin-project/lotus/pull/5814)
- API docs clarification: Document StateSearchMsg replaced message behavior (https://github.com/filecoin-project/lotus/pull/5838)
- api: Document StateReplay replaced message behavior (https://github.com/filecoin-project/lotus/pull/5840)
- add godocs to miner objects (https://github.com/filecoin-project/lotus/pull/2184)
- Add description to the client deal CLI command (https://github.com/filecoin-project/lotus/pull/5999)
- lint: don't skip builtin (https://github.com/filecoin-project/lotus/pull/5881)
- use deal duration from actors (https://github.com/filecoin-project/lotus/pull/5270)
- remote calc winningpost proof (https://github.com/filecoin-project/lotus/pull/5884)
- packer: other network images (https://github.com/filecoin-project/lotus/pull/5930)
- Convert the chainstore lock to RW (https://github.com/filecoin-project/lotus/pull/5971)
- Remove CachedBlockstore (https://github.com/filecoin-project/lotus/pull/5972)
- remove messagepool CapGasFee duplicate code (https://github.com/filecoin-project/lotus/pull/5992)
- Add a mining-heartbeat INFO line at every epoch (https://github.com/filecoin-project/lotus/pull/6183)
- chore(ci): Enable build on RC tags (https://github.com/filecoin-project/lotus/pull/6245)
- Upgrade nerpa to actor v4 and bump the version to rc4 (https://github.com/filecoin-project/lotus/pull/6249)
## Fixes
- return buffers after canceling badger operation (https://github.com/filecoin-project/lotus/pull/5796)
- avoid holding a lock while calling the View callback (https://github.com/filecoin-project/lotus/pull/5792)
- storagefsm: Trigger input processing when below limits (https://github.com/filecoin-project/lotus/pull/5801)
- After importing a previously deleted key, be able to delete it again (https://github.com/filecoin-project/lotus/pull/4653)
- fix StateManager.Replay on reward actor (https://github.com/filecoin-project/lotus/pull/5804)
- make sure atomic 64bit fields are 64bit aligned (https://github.com/filecoin-project/lotus/pull/5794)
- Import secp sigs in paych tests (https://github.com/filecoin-project/lotus/pull/5879)
- fix ci build-macos (https://github.com/filecoin-project/lotus/pull/5934)
- Fix creation of remainder account when it's not a multisig (https://github.com/filecoin-project/lotus/pull/5807)
- Fix fallback chainstore (https://github.com/filecoin-project/lotus/pull/6003)
- fix 4857: show help for set-addrs (https://github.com/filecoin-project/lotus/pull/5943)
- fix health report (https://github.com/filecoin-project/lotus/pull/6011)
# 1.9.0-rc2 / 2021-04-30
This is an optional Lotus release that introduces various improvements to the sealing, mining, and deal-making processes.
## Highlights
- OpenRPC Support (https://github.com/filecoin-project/lotus/pull/5843)
- Take latency into account when making interactive deals (https://github.com/filecoin-project/lotus/pull/5876)
- Update go-commp-utils for >10x faster client commp calculation (https://github.com/filecoin-project/lotus/pull/5892)
- add `lotus client cancel-retrieval` cmd to lotus CLI (https://github.com/filecoin-project/lotus/pull/5871)
- add `inspect-deal` command to `lotus client` (https://github.com/filecoin-project/lotus/pull/5833)
- Local retrieval support (https://github.com/filecoin-project/lotus/pull/5917)
- go-fil-markets v1.1.9 -> v1.2.5
- For a detailed changelog see https://github.com/filecoin-project/go-fil-markets/blob/master/CHANGELOG.md
- rust-fil-proofs v5.4.1 -> v7
- For a detailed changelog see https://github.com/filecoin-project/rust-fil-proofs/blob/master/CHANGELOG.md
## Changes
- storagefsm: Apply global events even in broken states (https://github.com/filecoin-project/lotus/pull/5962)
- Default the AlwaysKeepUnsealedCopy flag to true (https://github.com/filecoin-project/lotus/pull/5743)
- splitstore: compact hotstore prior to garbage collection (https://github.com/filecoin-project/lotus/pull/5778)
- ipfs-force bootstrapper update (https://github.com/filecoin-project/lotus/pull/5799)
- better logging when unsealing fails (https://github.com/filecoin-project/lotus/pull/5851)
- perf: add cache for gas permium estimation (https://github.com/filecoin-project/lotus/pull/5709)
- backupds: Compact log on restart (https://github.com/filecoin-project/lotus/pull/5875)
- backupds: Improve truncated log handling (https://github.com/filecoin-project/lotus/pull/5891)
- State CLI improvements (State CLI improvements)
- API proxy struct codegen (https://github.com/filecoin-project/lotus/pull/5854)
- move DI stuff for paychmgr into modules (https://github.com/filecoin-project/lotus/pull/5791)
- Implement Event observer and Settings for 3rd party dep injection (https://github.com/filecoin-project/lotus/pull/5693)
- Export developer and network commands for consumption by derivatives of Lotus (https://github.com/filecoin-project/lotus/pull/5864)
- mock sealer: Simulate randomness sideeffects (https://github.com/filecoin-project/lotus/pull/5805)
- localstorage: Demote reservation stat error to debug (https://github.com/filecoin-project/lotus/pull/5976)
- shed command to unpack miner info dumps (https://github.com/filecoin-project/lotus/pull/5800)
- Add two utils to Lotus-shed (https://github.com/filecoin-project/lotus/pull/5867)
- add shed election estimate command (https://github.com/filecoin-project/lotus/pull/5092)
- Add --actor flag in lotus-shed sectors terminate (https://github.com/filecoin-project/lotus/pull/5819)
- Move lotus mpool clear to lotus-shed (https://github.com/filecoin-project/lotus/pull/5900)
- Centralize everything on ipfs/go-log/v2 (https://github.com/filecoin-project/lotus/pull/5974)
- expose NextID from nice market actor interface (https://github.com/filecoin-project/lotus/pull/5850)
- add available options for perm on error (https://github.com/filecoin-project/lotus/pull/5814)
- API docs clarification: Document StateSearchMsg replaced message behavior (https://github.com/filecoin-project/lotus/pull/5838)
- api: Document StateReplay replaced message behavior (https://github.com/filecoin-project/lotus/pull/5840)
- add godocs to miner objects (https://github.com/filecoin-project/lotus/pull/2184)
- Add description to the client deal CLI command (https://github.com/filecoin-project/lotus/pull/5999)
- lint: don't skip builtin (https://github.com/filecoin-project/lotus/pull/5881)
- use deal duration from actors (https://github.com/filecoin-project/lotus/pull/5270)
- remote calc winningpost proof (https://github.com/filecoin-project/lotus/pull/5884)
- packer: other network images (https://github.com/filecoin-project/lotus/pull/5930)
- Convert the chainstore lock to RW (https://github.com/filecoin-project/lotus/pull/5971)
- Remove CachedBlockstore (https://github.com/filecoin-project/lotus/pull/5972)
- remove messagepool CapGasFee duplicate code (https://github.com/filecoin-project/lotus/pull/5992)
## Fixes
- return buffers after canceling badger operation (https://github.com/filecoin-project/lotus/pull/5796)
- avoid holding a lock while calling the View callback (https://github.com/filecoin-project/lotus/pull/5792)
- storagefsm: Trigger input processing when below limits (https://github.com/filecoin-project/lotus/pull/5801)
- After importing a previously deleted key, be able to delete it again (https://github.com/filecoin-project/lotus/pull/4653)
- fix StateManager.Replay on reward actor (https://github.com/filecoin-project/lotus/pull/5804)
- make sure atomic 64bit fields are 64bit aligned (https://github.com/filecoin-project/lotus/pull/5794)
- Import secp sigs in paych tests (https://github.com/filecoin-project/lotus/pull/5879)
- fix ci build-macos (https://github.com/filecoin-project/lotus/pull/5934)
- Fix creation of remainder account when it's not a multisig (https://github.com/filecoin-project/lotus/pull/5807)
- Fix fallback chainstore (https://github.com/filecoin-project/lotus/pull/6003)
- fix 4857: show help for set-addrs (https://github.com/filecoin-project/lotus/pull/5943)
- fix health report (https://github.com/filecoin-project/lotus/pull/6011)
# 1.8.0 / 2021-04-05
This is a mandatory release of Lotus that upgrades the network to version 12, which introduces various performance improvements to the cron processing of the power actor. The network will upgrade at height 712320, which is 2021-04-29T06:00:00Z.

View File

@ -629,7 +629,7 @@ type FullNode interface {
// proposal. This method of approval can be used to ensure you only approve
// exactly the transaction you think you are.
// It takes the following params: <multisig address>, <proposed message ID>, <proposer address>, <recipient address>, <value to transfer>,
// <sender address of the approve msg>, <method to call in the proposed message>, <params to include in the proposed message>
// <sender address of the approve msg>, <method to call in the approved message>, <params to include in the proposed message>
MsigApproveTxnHash(context.Context, address.Address, uint64, address.Address, address.Address, types.BigInt, address.Address, uint64, []byte) (cid.Cid, error) //perm:sign
// MsigCancel cancels a previously-proposed multisig message

View File

@ -45,7 +45,8 @@ const UpgradeNorwegianHeight = 114000
const UpgradeTurboHeight = 193789
const UpgradeHyperdriveHeight = 9999999
// 2021-06-11T14:30:00Z
const UpgradeHyperdriveHeight = 321519
func init() {
policy.SetConsensusMinerMinPower(abi.NewStoragePower(32 << 30))

View File

@ -3,6 +3,8 @@ package policy
import (
"sort"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/network"
"github.com/filecoin-project/lotus/chain/actors"
@ -367,3 +369,31 @@ func GetDeclarationsMax(nwVer network.Version) int {
panic("unsupported network version")
}
}
func AggregateNetworkFee(nwVer network.Version, aggregateSize int, baseFee abi.TokenAmount) abi.TokenAmount {
switch actors.VersionForNetwork(nwVer) {
case actors.Version0:
return big.Zero()
case actors.Version2:
return big.Zero()
case actors.Version3:
return big.Zero()
case actors.Version4:
return big.Zero()
case actors.Version5:
return miner5.AggregateNetworkFee(aggregateSize, baseFee)
default:
panic("unsupported network version")
}
}

View File

@ -3,6 +3,8 @@ package policy
import (
"sort"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/network"
"github.com/filecoin-project/lotus/chain/actors"
@ -246,3 +248,18 @@ func GetDeclarationsMax(nwVer network.Version) int {
panic("unsupported network version")
}
}
func AggregateNetworkFee(nwVer network.Version, aggregateSize int, baseFee abi.TokenAmount) abi.TokenAmount {
switch actors.VersionForNetwork(nwVer) {
{{range .versions}}
case actors.Version{{.}}:
{{if (le . 4)}}
return big.Zero()
{{else}}
return miner{{.}}.AggregateNetworkFee(aggregateSize, baseFee)
{{end}}
{{end}}
default:
panic("unsupported network version")
}
}

View File

@ -19,18 +19,18 @@ import (
var baseFeeUpperBoundFactor = types.NewInt(10)
// CheckMessages performs a set of logic checks for a list of messages, prior to submitting it to the mpool
func (mp *MessagePool) CheckMessages(protos []*api.MessagePrototype) ([][]api.MessageCheckStatus, error) {
func (mp *MessagePool) CheckMessages(ctx context.Context, protos []*api.MessagePrototype) ([][]api.MessageCheckStatus, error) {
flex := make([]bool, len(protos))
msgs := make([]*types.Message, len(protos))
for i, p := range protos {
flex[i] = !p.ValidNonce
msgs[i] = &p.Message
}
return mp.checkMessages(msgs, false, flex)
return mp.checkMessages(ctx, msgs, false, flex)
}
// CheckPendingMessages performs a set of logical sets for all messages pending from a given actor
func (mp *MessagePool) CheckPendingMessages(from address.Address) ([][]api.MessageCheckStatus, error) {
func (mp *MessagePool) CheckPendingMessages(ctx context.Context, from address.Address) ([][]api.MessageCheckStatus, error) {
var msgs []*types.Message
mp.lk.Lock()
mset, ok := mp.pending[from]
@ -49,12 +49,12 @@ func (mp *MessagePool) CheckPendingMessages(from address.Address) ([][]api.Messa
return msgs[i].Nonce < msgs[j].Nonce
})
return mp.checkMessages(msgs, true, nil)
return mp.checkMessages(ctx, msgs, true, nil)
}
// CheckReplaceMessages performs a set of logical checks for related messages while performing a
// replacement.
func (mp *MessagePool) CheckReplaceMessages(replace []*types.Message) ([][]api.MessageCheckStatus, error) {
func (mp *MessagePool) CheckReplaceMessages(ctx context.Context, replace []*types.Message) ([][]api.MessageCheckStatus, error) {
msgMap := make(map[address.Address]map[uint64]*types.Message)
count := 0
@ -94,12 +94,12 @@ func (mp *MessagePool) CheckReplaceMessages(replace []*types.Message) ([][]api.M
start = end
}
return mp.checkMessages(msgs, true, nil)
return mp.checkMessages(ctx, msgs, true, nil)
}
// flexibleNonces should be either nil or of len(msgs), it signifies that message at given index
// has non-determied nonce at this point
func (mp *MessagePool) checkMessages(msgs []*types.Message, interned bool, flexibleNonces []bool) (result [][]api.MessageCheckStatus, err error) {
func (mp *MessagePool) checkMessages(ctx context.Context, msgs []*types.Message, interned bool, flexibleNonces []bool) (result [][]api.MessageCheckStatus, err error) {
if mp.api.IsLite() {
return nil, nil
}
@ -160,7 +160,7 @@ func (mp *MessagePool) checkMessages(msgs []*types.Message, interned bool, flexi
} else {
mp.lk.Unlock()
stateNonce, err := mp.getStateNonce(m.From, curTs)
stateNonce, err := mp.getStateNonce(ctx, m.From, curTs)
if err != nil {
check.OK = false
check.Err = fmt.Sprintf("error retrieving state nonce: %s", err.Error())
@ -193,7 +193,7 @@ func (mp *MessagePool) checkMessages(msgs []*types.Message, interned bool, flexi
balance, ok := balances[m.From]
if !ok {
balance, err = mp.getStateBalance(m.From, curTs)
balance, err = mp.getStateBalance(ctx, m.From, curTs)
if err != nil {
check.OK = false
check.Err = fmt.Sprintf("error retrieving state balance: %s", err)

View File

@ -34,6 +34,7 @@ import (
"github.com/filecoin-project/lotus/chain/vm"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/lib/sigs"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/raulk/clock"
@ -577,7 +578,7 @@ func (mp *MessagePool) addLocal(ctx context.Context, m *types.SignedMessage) err
return nil
}
// verifyMsgBeforeAdd verifies that the message meets the minimum criteria for block inclusio
// verifyMsgBeforeAdd verifies that the message meets the minimum criteria for block inclusion
// and whether the message has enough funds to be included in the next 20 blocks.
// If the message is not valid for block inclusion, it returns an error.
// For local messages, if the message can be included in the next 20 blocks, it returns true to
@ -631,6 +632,9 @@ func (mp *MessagePool) verifyMsgBeforeAdd(m *types.SignedMessage, curTs *types.T
}
func (mp *MessagePool) Push(ctx context.Context, m *types.SignedMessage) (cid.Cid, error) {
done := metrics.Timer(ctx, metrics.MpoolPushDuration)
defer done()
err := mp.checkMessage(m)
if err != nil {
return cid.Undef, err
@ -697,6 +701,9 @@ func (mp *MessagePool) checkMessage(m *types.SignedMessage) error {
}
func (mp *MessagePool) Add(ctx context.Context, m *types.SignedMessage) error {
done := metrics.Timer(ctx, metrics.MpoolAddDuration)
defer done()
err := mp.checkMessage(m)
if err != nil {
return err
@ -752,7 +759,7 @@ func (mp *MessagePool) VerifyMsgSig(m *types.SignedMessage) error {
}
func (mp *MessagePool) checkBalance(ctx context.Context, m *types.SignedMessage, curTs *types.TipSet) error {
balance, err := mp.getStateBalance(m.Message.From, curTs)
balance, err := mp.getStateBalance(ctx, m.Message.From, curTs)
if err != nil {
return xerrors.Errorf("failed to check sender balance: %s: %w", err, ErrSoftValidationFailure)
}
@ -785,7 +792,10 @@ func (mp *MessagePool) checkBalance(ctx context.Context, m *types.SignedMessage,
}
func (mp *MessagePool) addTs(ctx context.Context, m *types.SignedMessage, curTs *types.TipSet, local, untrusted bool) (bool, error) {
snonce, err := mp.getStateNonce(m.Message.From, curTs)
done := metrics.Timer(ctx, metrics.MpoolAddTsDuration)
defer done()
snonce, err := mp.getStateNonce(ctx, m.Message.From, curTs)
if err != nil {
return false, xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrSoftValidationFailure)
}
@ -833,7 +843,7 @@ func (mp *MessagePool) addLoaded(ctx context.Context, m *types.SignedMessage) er
return xerrors.Errorf("current tipset not loaded")
}
snonce, err := mp.getStateNonce(m.Message.From, curTs)
snonce, err := mp.getStateNonce(ctx, m.Message.From, curTs)
if err != nil {
return xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrSoftValidationFailure)
}
@ -885,7 +895,7 @@ func (mp *MessagePool) addLocked(ctx context.Context, m *types.SignedMessage, st
}
if !ok {
nonce, err := mp.getStateNonce(m.Message.From, mp.curTs)
nonce, err := mp.getStateNonce(ctx, m.Message.From, mp.curTs)
if err != nil {
return xerrors.Errorf("failed to get initial actor nonce: %w", err)
}
@ -946,7 +956,7 @@ func (mp *MessagePool) GetActor(_ context.Context, addr address.Address, _ types
}
func (mp *MessagePool) getNonceLocked(ctx context.Context, addr address.Address, curTs *types.TipSet) (uint64, error) {
stateNonce, err := mp.getStateNonce(addr, curTs) // sanity check
stateNonce, err := mp.getStateNonce(ctx, addr, curTs) // sanity check
if err != nil {
return 0, err
}
@ -970,7 +980,10 @@ func (mp *MessagePool) getNonceLocked(ctx context.Context, addr address.Address,
return stateNonce, nil
}
func (mp *MessagePool) getStateNonce(addr address.Address, ts *types.TipSet) (uint64, error) {
func (mp *MessagePool) getStateNonce(ctx context.Context, addr address.Address, ts *types.TipSet) (uint64, error) {
done := metrics.Timer(ctx, metrics.MpoolGetNonceDuration)
defer done()
act, err := mp.api.GetActorAfter(addr, ts)
if err != nil {
return 0, err
@ -979,7 +992,10 @@ func (mp *MessagePool) getStateNonce(addr address.Address, ts *types.TipSet) (ui
return act.Nonce, nil
}
func (mp *MessagePool) getStateBalance(addr address.Address, ts *types.TipSet) (types.BigInt, error) {
func (mp *MessagePool) getStateBalance(ctx context.Context, addr address.Address, ts *types.TipSet) (types.BigInt, error) {
done := metrics.Timer(ctx, metrics.MpoolGetBalanceDuration)
defer done()
act, err := mp.api.GetActorAfter(addr, ts)
if err != nil {
return types.EmptyInt, err

View File

@ -507,6 +507,12 @@ func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubs
return mv.validateLocalMessage(ctx, msg)
}
start := time.Now()
defer func() {
ms := time.Now().Sub(start).Microseconds()
stats.Record(ctx, metrics.MessageValidationDuration.M(float64(ms)/1000))
}()
stats.Record(ctx, metrics.MessageReceived.M(1))
m, err := types.DecodeSignedMessage(msg.Message.GetData())
if err != nil {
@ -538,6 +544,12 @@ func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubs
return pubsub.ValidationReject
}
}
ctx, _ = tag.New(
ctx,
tag.Upsert(metrics.MsgValid, "true"),
)
stats.Record(ctx, metrics.MessageValidationSuccess.M(1))
return pubsub.ValidationAccept
}
@ -547,6 +559,13 @@ func (mv *MessageValidator) validateLocalMessage(ctx context.Context, msg *pubsu
ctx,
tag.Upsert(metrics.Local, "true"),
)
start := time.Now()
defer func() {
ms := time.Now().Sub(start).Microseconds()
stats.Record(ctx, metrics.MessageValidationDuration.M(float64(ms)/1000))
}()
// do some lightweight validation
stats.Record(ctx, metrics.MessagePublished.M(1))
@ -581,6 +600,11 @@ func (mv *MessageValidator) validateLocalMessage(ctx context.Context, msg *pubsu
return pubsub.ValidationIgnore
}
ctx, _ = tag.New(
ctx,
tag.Upsert(metrics.MsgValid, "true"),
)
stats.Record(ctx, metrics.MessageValidationSuccess.M(1))
return pubsub.ValidationAccept
}

View File

@ -536,7 +536,7 @@ var ChainListCmd = &cli.Command{
Aliases: []string{"love"},
Usage: "View a segment of the chain",
Flags: []cli.Flag{
&cli.Uint64Flag{Name: "height"},
&cli.Uint64Flag{Name: "height", DefaultText: "current head"},
&cli.IntFlag{Name: "count", Value: 30},
&cli.StringFlag{
Name: "format",

View File

@ -0,0 +1,103 @@
package main
import (
"fmt"
"io"
"os"
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
offline "github.com/ipfs/go-ipfs-exchange-offline"
format "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-merkledag"
"github.com/ipld/go-car"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/node/repo"
)
func carWalkFunc(nd format.Node) (out []*format.Link, err error) {
for _, link := range nd.Links() {
if link.Cid.Prefix().Codec == cid.FilCommitmentSealed || link.Cid.Prefix().Codec == cid.FilCommitmentUnsealed {
continue
}
out = append(out, link)
}
return out, nil
}
var exportCarCmd = &cli.Command{
Name: "export-car",
Description: "Export a car from repo (requires node to be offline)",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "repo",
Value: "~/.lotus",
},
},
Action: func(cctx *cli.Context) error {
if cctx.Args().Len() != 2 {
return lcli.ShowHelp(cctx, fmt.Errorf("must specify file name and object"))
}
outfile := cctx.Args().First()
var roots []cid.Cid
for _, arg := range cctx.Args().Tail() {
c, err := cid.Decode(arg)
if err != nil {
return err
}
roots = append(roots, c)
}
ctx := lcli.ReqContext(cctx)
r, err := repo.NewFS(cctx.String("repo"))
if err != nil {
return xerrors.Errorf("opening fs repo: %w", err)
}
exists, err := r.Exists()
if err != nil {
return err
}
if !exists {
return xerrors.Errorf("lotus repo doesn't exist")
}
lr, err := r.Lock(repo.FullNode)
if err != nil {
return err
}
defer lr.Close() //nolint:errcheck
fi, err := os.Create(outfile)
if err != nil {
return xerrors.Errorf("opening the output file: %w", err)
}
defer fi.Close() //nolint:errcheck
bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
defer func() {
if c, ok := bs.(io.Closer); ok {
if err := c.Close(); err != nil {
log.Warnf("failed to close blockstore: %s", err)
}
}
}()
dag := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
err = car.WriteCarWithWalker(ctx, dag, roots, fi, carWalkFunc)
if err != nil {
return err
}
return nil
},
}

View File

@ -43,6 +43,7 @@ func main() {
minerCmd,
mpoolStatsCmd,
exportChainCmd,
exportCarCmd,
consensusCmd,
storageStatsCmd,
syncCmd,
@ -58,6 +59,7 @@ func main() {
signaturesCmd,
actorCmd,
minerTypesCmd,
minerMultisigsCmd,
}
app := &cli.App{

View File

@ -0,0 +1,388 @@
package main
import (
"bytes"
"fmt"
"strconv"
"github.com/filecoin-project/go-state-types/abi"
miner5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/miner"
msig5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/multisig"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
)
var minerMultisigsCmd = &cli.Command{
Name: "miner-multisig",
Description: "a collection of utilities for using multisigs as owner addresses of miners",
Subcommands: []*cli.Command{
mmProposeWithdrawBalance,
mmApproveWithdrawBalance,
mmProposeChangeOwner,
mmApproveChangeOwner,
},
Flags: []cli.Flag{
&cli.StringFlag{
Name: "from",
Usage: "specify address to send message from",
Required: true,
},
&cli.StringFlag{
Name: "multisig",
Usage: "specify multisig that will receive the message",
Required: true,
},
&cli.StringFlag{
Name: "miner",
Usage: "specify miner being acted upon",
Required: true,
},
},
}
var mmProposeWithdrawBalance = &cli.Command{
Name: "propose-withdraw",
Usage: "Propose to withdraw FIL from the miner",
ArgsUsage: "[amount]",
Action: func(cctx *cli.Context) error {
if !cctx.Args().Present() {
return fmt.Errorf("must pass amount to withdraw")
}
api, closer, err := lcli.GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
multisigAddr, sender, minerAddr, err := getInputs(cctx)
if err != nil {
return err
}
val, err := types.ParseFIL(cctx.Args().First())
if err != nil {
return err
}
sp, err := actors.SerializeParams(&miner5.WithdrawBalanceParams{
AmountRequested: abi.TokenAmount(val),
})
if err != nil {
return err
}
pcid, err := api.MsigPropose(ctx, multisigAddr, minerAddr, big.Zero(), sender, uint64(miner.Methods.WithdrawBalance), sp)
if err != nil {
return xerrors.Errorf("proposing message: %w", err)
}
fmt.Fprintln(cctx.App.Writer, "Propose Message CID:", pcid)
// wait for it to get mined into a block
wait, err := api.StateWaitMsg(ctx, pcid, build.MessageConfidence)
if err != nil {
return err
}
// check it executed successfully
if wait.Receipt.ExitCode != 0 {
fmt.Fprintln(cctx.App.Writer, "Propose owner change tx failed!")
return err
}
var retval msig5.ProposeReturn
if err := retval.UnmarshalCBOR(bytes.NewReader(wait.Receipt.Return)); err != nil {
return fmt.Errorf("failed to unmarshal propose return value: %w", err)
}
fmt.Printf("Transaction ID: %d\n", retval.TxnID)
if retval.Applied {
fmt.Printf("Transaction was executed during propose\n")
fmt.Printf("Exit Code: %d\n", retval.Code)
fmt.Printf("Return Value: %x\n", retval.Ret)
}
return nil
},
}
var mmApproveWithdrawBalance = &cli.Command{
Name: "approve-withdraw",
Usage: "Approve to withdraw FIL from the miner",
ArgsUsage: "[amount txnId proposer]",
Action: func(cctx *cli.Context) error {
if cctx.NArg() != 3 {
return fmt.Errorf("must pass amount, txn Id, and proposer address")
}
api, closer, err := lcli.GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
multisigAddr, sender, minerAddr, err := getInputs(cctx)
if err != nil {
return err
}
val, err := types.ParseFIL(cctx.Args().First())
if err != nil {
return err
}
sp, err := actors.SerializeParams(&miner5.WithdrawBalanceParams{
AmountRequested: abi.TokenAmount(val),
})
if err != nil {
return err
}
txid, err := strconv.ParseUint(cctx.Args().Get(1), 10, 64)
if err != nil {
return err
}
proposer, err := address.NewFromString(cctx.Args().Get(2))
if err != nil {
return err
}
acid, err := api.MsigApproveTxnHash(ctx, multisigAddr, txid, proposer, minerAddr, big.Zero(), sender, uint64(miner.Methods.WithdrawBalance), sp)
if err != nil {
return xerrors.Errorf("approving message: %w", err)
}
fmt.Fprintln(cctx.App.Writer, "Approve Message CID:", acid)
// wait for it to get mined into a block
wait, err := api.StateWaitMsg(ctx, acid, build.MessageConfidence)
if err != nil {
return err
}
// check it executed successfully
if wait.Receipt.ExitCode != 0 {
fmt.Fprintln(cctx.App.Writer, "Approve owner change tx failed!")
return err
}
var retval msig5.ApproveReturn
if err := retval.UnmarshalCBOR(bytes.NewReader(wait.Receipt.Return)); err != nil {
return fmt.Errorf("failed to unmarshal approve return value: %w", err)
}
if retval.Applied {
fmt.Printf("Transaction was executed with the approve\n")
fmt.Printf("Exit Code: %d\n", retval.Code)
fmt.Printf("Return Value: %x\n", retval.Ret)
} else {
fmt.Println("Transaction was approved, but not executed")
}
return nil
},
}
var mmProposeChangeOwner = &cli.Command{
Name: "propose-change-owner",
Usage: "Propose an owner address change",
ArgsUsage: "[newOwner]",
Action: func(cctx *cli.Context) error {
if !cctx.Args().Present() {
return fmt.Errorf("must pass new owner address")
}
api, closer, err := lcli.GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
multisigAddr, sender, minerAddr, err := getInputs(cctx)
if err != nil {
return err
}
na, err := address.NewFromString(cctx.Args().First())
if err != nil {
return err
}
newAddr, err := api.StateLookupID(ctx, na, types.EmptyTSK)
if err != nil {
return err
}
mi, err := api.StateMinerInfo(ctx, minerAddr, types.EmptyTSK)
if err != nil {
return err
}
if mi.Owner == newAddr {
return fmt.Errorf("owner address already set to %s", na)
}
sp, err := actors.SerializeParams(&newAddr)
if err != nil {
return xerrors.Errorf("serializing params: %w", err)
}
pcid, err := api.MsigPropose(ctx, multisigAddr, minerAddr, big.Zero(), sender, uint64(miner.Methods.ChangeOwnerAddress), sp)
if err != nil {
return xerrors.Errorf("proposing message: %w", err)
}
fmt.Fprintln(cctx.App.Writer, "Propose Message CID:", pcid)
// wait for it to get mined into a block
wait, err := api.StateWaitMsg(ctx, pcid, build.MessageConfidence)
if err != nil {
return err
}
// check it executed successfully
if wait.Receipt.ExitCode != 0 {
fmt.Fprintln(cctx.App.Writer, "Propose owner change tx failed!")
return err
}
var retval msig5.ProposeReturn
if err := retval.UnmarshalCBOR(bytes.NewReader(wait.Receipt.Return)); err != nil {
return fmt.Errorf("failed to unmarshal propose return value: %w", err)
}
fmt.Printf("Transaction ID: %d\n", retval.TxnID)
if retval.Applied {
fmt.Printf("Transaction was executed during propose\n")
fmt.Printf("Exit Code: %d\n", retval.Code)
fmt.Printf("Return Value: %x\n", retval.Ret)
}
return nil
},
}
var mmApproveChangeOwner = &cli.Command{
Name: "approve-change-owner",
Usage: "Approve an owner address change",
ArgsUsage: "[newOwner txnId proposer]",
Action: func(cctx *cli.Context) error {
if cctx.NArg() != 3 {
return fmt.Errorf("must pass new owner address, txn Id, and proposer address")
}
api, closer, err := lcli.GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
multisigAddr, sender, minerAddr, err := getInputs(cctx)
if err != nil {
return err
}
na, err := address.NewFromString(cctx.Args().First())
if err != nil {
return err
}
newAddr, err := api.StateLookupID(ctx, na, types.EmptyTSK)
if err != nil {
return err
}
txid, err := strconv.ParseUint(cctx.Args().Get(1), 10, 64)
if err != nil {
return err
}
proposer, err := address.NewFromString(cctx.Args().Get(2))
if err != nil {
return err
}
mi, err := api.StateMinerInfo(ctx, minerAddr, types.EmptyTSK)
if err != nil {
return err
}
if mi.Owner == newAddr {
return fmt.Errorf("owner address already set to %s", na)
}
sp, err := actors.SerializeParams(&newAddr)
if err != nil {
return xerrors.Errorf("serializing params: %w", err)
}
acid, err := api.MsigApproveTxnHash(ctx, multisigAddr, txid, proposer, minerAddr, big.Zero(), sender, uint64(miner.Methods.ChangeOwnerAddress), sp)
if err != nil {
return xerrors.Errorf("approving message: %w", err)
}
fmt.Fprintln(cctx.App.Writer, "Approve Message CID:", acid)
// wait for it to get mined into a block
wait, err := api.StateWaitMsg(ctx, acid, build.MessageConfidence)
if err != nil {
return err
}
// check it executed successfully
if wait.Receipt.ExitCode != 0 {
fmt.Fprintln(cctx.App.Writer, "Approve owner change tx failed!")
return err
}
var retval msig5.ApproveReturn
if err := retval.UnmarshalCBOR(bytes.NewReader(wait.Receipt.Return)); err != nil {
return fmt.Errorf("failed to unmarshal approve return value: %w", err)
}
if retval.Applied {
fmt.Printf("Transaction was executed with the approve\n")
fmt.Printf("Exit Code: %d\n", retval.Code)
fmt.Printf("Return Value: %x\n", retval.Ret)
} else {
fmt.Println("Transaction was approved, but not executed")
}
return nil
},
}
func getInputs(cctx *cli.Context) (address.Address, address.Address, address.Address, error) {
multisigAddr, err := address.NewFromString(cctx.String("multisig"))
if err != nil {
return address.Undef, address.Undef, address.Undef, err
}
sender, err := address.NewFromString(cctx.String("from"))
if err != nil {
return address.Undef, address.Undef, address.Undef, err
}
minerAddr, err := address.NewFromString(cctx.String("miner"))
if err != nil {
return address.Undef, address.Undef, address.Undef, err
}
return multisigAddr, sender, minerAddr, nil
}

View File

@ -295,6 +295,7 @@ var stateList = []stateMeta{
{col: color.FgYellow, state: sealing.PreCommitBatchWait},
{col: color.FgYellow, state: sealing.WaitSeed},
{col: color.FgYellow, state: sealing.Committing},
{col: color.FgYellow, state: sealing.CommitFinalize},
{col: color.FgYellow, state: sealing.SubmitCommit},
{col: color.FgYellow, state: sealing.CommitWait},
{col: color.FgYellow, state: sealing.SubmitCommitAggregate},
@ -315,6 +316,7 @@ var stateList = []stateMeta{
{col: color.FgRed, state: sealing.PreCommitFailed},
{col: color.FgRed, state: sealing.ComputeProofFailed},
{col: color.FgRed, state: sealing.CommitFailed},
{col: color.FgRed, state: sealing.CommitFinalizeFailed},
{col: color.FgRed, state: sealing.PackingFailed},
{col: color.FgRed, state: sealing.FinalizeFailed},
{col: color.FgRed, state: sealing.Faulty},

View File

@ -2505,7 +2505,7 @@ using both transaction ID and a hash of the parameters used in the
proposal. This method of approval can be used to ensure you only approve
exactly the transaction you think you are.
It takes the following params: <multisig address>, <proposed message ID>, <proposer address>, <recipient address>, <value to transfer>,
<sender address of the approve msg>, <method to call in the proposed message>, <params to include in the proposed message>
<sender address of the approve msg>, <method to call in the approved message>, <params to include in the proposed message>
Perms: sign

View File

@ -25,7 +25,6 @@ We're happy to announce Lotus X.Y.Z...
First steps:
- [ ] Fork a new branch (`release/vX.Y.Z`) from `master` and make any further release related changes to this branch. If any "non-trivial" changes get added to the release, uncheck all the checkboxes and return to this stage.
- [ ] Prep the changelog using `scripts/mkreleaselog`, and add it to `CHANGELOG.md`
- [ ] Bump the version in `version.go` in the `master` branch to `vX.(Y+1).0-dev`.
Prepping an RC:
@ -93,7 +92,7 @@ Testing an RC:
- [ ] Final preparation
- [ ] Verify that version string in [`version.go`](https://github.com/ipfs/go-ipfs/tree/master/version.go) has been updated.
- [ ] Ensure that [CHANGELOG.md](https://github.com/filecoin-project/lotus/blob/master/CHANGELOG.md) is up to date
- [ ] Ensure that [README.md](https://github.com/filecoin-project/lotus/blob/master/README.md) is up to date
- [ ] Prep the changelog using `scripts/mkreleaselog`, and add it to `CHANGELOG.md`
- [ ] Merge `release-vX.Y.Z` into the `releases` branch.
- [ ] Tag this merge commit (on the `releases` branch) with `vX.Y.Z`
- [ ] Cut the release [here](https://github.com/filecoin-project/lotus/releases/new?prerelease=true&target=releases).

2
extern/filecoin-ffi vendored

@ -1 +1 @@
Subproject commit 8b97bd8230b77bd32f4f27e4766a6d8a03b4e801
Subproject commit d2e3aa7d61501d69bed6e898de13d1312b021e62

View File

@ -31,6 +31,7 @@ import (
"github.com/filecoin-project/specs-storage/storage"
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/filecoin-ffi/generated"
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper/basicfs"
@ -853,3 +854,87 @@ func TestAddPiece512MPadded(t *testing.T) {
require.Equal(t, "baga6ea4seaqonenxyku4o7hr5xkzbqsceipf6xgli3on54beqbk6k246sbooobq", c.PieceCID.String())
}
func setupLogger(t *testing.T) *bytes.Buffer {
_ = os.Setenv("RUST_LOG", "info")
var bb bytes.Buffer
r, w, err := os.Pipe()
if err != nil {
t.Fatal(err)
}
go func() {
_, _ = io.Copy(&bb, r)
runtime.KeepAlive(w)
}()
resp := generated.FilInitLogFd(int32(w.Fd()))
resp.Deref()
defer generated.FilDestroyInitLogFdResponse(resp)
if resp.StatusCode != generated.FCPResponseStatusFCPNoError {
t.Fatal(generated.RawString(resp.ErrorMsg).Copy())
}
return &bb
}
func TestMulticoreSDR(t *testing.T) {
if os.Getenv("TEST_RUSTPROOFS_LOGS") != "1" {
t.Skip("skipping test without TEST_RUSTPROOFS_LOGS=1")
}
rustLogger := setupLogger(t)
getGrothParamFileAndVerifyingKeys(sectorSize)
dir, err := ioutil.TempDir("", "sbtest")
if err != nil {
t.Fatal(err)
}
miner := abi.ActorID(123)
sp := &basicfs.Provider{
Root: dir,
}
sb, err := New(sp)
if err != nil {
t.Fatalf("%+v", err)
}
cleanup := func() {
if t.Failed() {
fmt.Printf("not removing %s\n", dir)
return
}
if err := os.RemoveAll(dir); err != nil {
t.Error(err)
}
}
defer cleanup()
si := storage.SectorRef{
ID: abi.SectorID{Miner: miner, Number: 1},
ProofType: sealProofType,
}
s := seal{ref: si}
// check multicore
_ = os.Setenv("FIL_PROOFS_USE_MULTICORE_SDR", "1")
rustLogger.Reset()
s.precommit(t, sb, si, func() {})
ok := false
for _, s := range strings.Split(rustLogger.String(), "\n") {
if strings.Contains(s, "create_label::multi") {
ok = true
break
}
}
require.True(t, ok)
}

View File

@ -7,6 +7,10 @@ import (
"sync"
"time"
"github.com/filecoin-project/go-state-types/network"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
@ -23,23 +27,28 @@ import (
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
"github.com/filecoin-project/lotus/node/config"
)
const arp = abi.RegisteredAggregationProof_SnarkPackV1
//go:generate go run github.com/golang/mock/mockgen -destination=mocks/mock_commit_batcher.go -package=mocks . CommitBatcherApi
type CommitBatcherApi interface {
SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error)
StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error)
ChainHead(ctx context.Context) (TipSetToken, abi.ChainEpoch, error)
ChainBaseFee(context.Context, TipSetToken) (abi.TokenAmount, error)
StateSectorPreCommitInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*miner.SectorPreCommitOnChainInfo, error)
StateMinerInitialPledgeCollateral(context.Context, address.Address, miner.SectorPreCommitInfo, TipSetToken) (big.Int, error)
StateNetworkVersion(ctx context.Context, tok TipSetToken) (network.Version, error)
}
type AggregateInput struct {
spt abi.RegisteredSealProof
info proof5.AggregateSealVerifyInfo
proof []byte
Spt abi.RegisteredSealProof
Info proof5.AggregateSealVerifyInfo
Proof []byte
}
type CommitBatcher struct {
@ -47,11 +56,11 @@ type CommitBatcher struct {
maddr address.Address
mctx context.Context
addrSel AddrSel
feeCfg FeeConfig
feeCfg config.MinerFeeConfig
getConfig GetSealingConfigFunc
prover ffiwrapper.Prover
deadlines map[abi.SectorNumber]time.Time
cutoffs map[abi.SectorNumber]time.Time
todo map[abi.SectorNumber]AggregateInput
waiting map[abi.SectorNumber][]chan sealiface.CommitBatchRes
@ -60,7 +69,7 @@ type CommitBatcher struct {
lk sync.Mutex
}
func NewCommitBatcher(mctx context.Context, maddr address.Address, api CommitBatcherApi, addrSel AddrSel, feeCfg FeeConfig, getConfig GetSealingConfigFunc, prov ffiwrapper.Prover) *CommitBatcher {
func NewCommitBatcher(mctx context.Context, maddr address.Address, api CommitBatcherApi, addrSel AddrSel, feeCfg config.MinerFeeConfig, getConfig GetSealingConfigFunc, prov ffiwrapper.Prover) *CommitBatcher {
b := &CommitBatcher{
api: api,
maddr: maddr,
@ -70,7 +79,7 @@ func NewCommitBatcher(mctx context.Context, maddr address.Address, api CommitBat
getConfig: getConfig,
prover: prov,
deadlines: map[abi.SectorNumber]time.Time{},
cutoffs: map[abi.SectorNumber]time.Time{},
todo: map[abi.SectorNumber]AggregateInput{},
waiting: map[abi.SectorNumber][]chan sealiface.CommitBatchRes{},
@ -132,30 +141,30 @@ func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time
return nil
}
var deadline time.Time
var cutoff time.Time
for sn := range b.todo {
sectorDeadline := b.deadlines[sn]
if deadline.IsZero() || (!sectorDeadline.IsZero() && sectorDeadline.Before(deadline)) {
deadline = sectorDeadline
sectorCutoff := b.cutoffs[sn]
if cutoff.IsZero() || (!sectorCutoff.IsZero() && sectorCutoff.Before(cutoff)) {
cutoff = sectorCutoff
}
}
for sn := range b.waiting {
sectorDeadline := b.deadlines[sn]
if deadline.IsZero() || (!sectorDeadline.IsZero() && sectorDeadline.Before(deadline)) {
deadline = sectorDeadline
sectorCutoff := b.cutoffs[sn]
if cutoff.IsZero() || (!sectorCutoff.IsZero() && sectorCutoff.Before(cutoff)) {
cutoff = sectorCutoff
}
}
if deadline.IsZero() {
if cutoff.IsZero() {
return time.After(maxWait)
}
deadline = deadline.Add(-slack)
if deadline.Before(now) {
cutoff = cutoff.Add(-slack)
if cutoff.Before(now) {
return time.After(time.Nanosecond) // can't return 0
}
wait := deadline.Sub(now)
wait := cutoff.Sub(now)
if wait > maxWait {
wait = maxWait
}
@ -208,7 +217,7 @@ func (b *CommitBatcher) maybeStartBatch(notif, after bool) ([]sealiface.CommitBa
delete(b.waiting, sn)
delete(b.todo, sn)
delete(b.deadlines, sn)
delete(b.cutoffs, sn)
}
}
@ -239,6 +248,8 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa
break
}
res.Sectors = append(res.Sectors, id)
sc, err := b.getSectorCollateral(id, tok)
if err != nil {
res.FailedSectors[id] = err.Error()
@ -247,9 +258,8 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa
collateral = big.Add(collateral, sc)
res.Sectors = append(res.Sectors, id)
params.SectorNumbers.Set(uint64(id))
infos = append(infos, p.info)
infos = append(infos, p.Info)
}
sort.Slice(infos, func(i, j int) bool {
@ -257,7 +267,7 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa
})
for _, info := range infos {
proofs = append(proofs, b.todo[info.Number].proof)
proofs = append(proofs, b.todo[info.Number].Proof)
}
mid, err := address.IDFromAddress(b.maddr)
@ -267,7 +277,7 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa
params.AggregateProof, err = b.prover.AggregateSealProofs(proof5.AggregateSealVerifyProofAndInfos{
Miner: abi.ActorID(mid),
SealProof: b.todo[infos[0].Number].spt,
SealProof: b.todo[infos[0].Number].Spt,
AggregateProof: arp,
Infos: infos,
}, proofs)
@ -285,14 +295,29 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't get miner info: %w", err)
}
goodFunds := big.Add(b.feeCfg.MaxCommitGasFee, collateral)
maxFee := b.feeCfg.MaxCommitBatchGasFee.FeeForSectors(len(infos))
bf, err := b.api.ChainBaseFee(b.mctx, tok)
if err != nil {
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't get base fee: %w", err)
}
nv, err := b.api.StateNetworkVersion(b.mctx, tok)
if err != nil {
log.Errorf("getting network version: %s", err)
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting network version: %s", err)
}
aggFee := policy.AggregateNetworkFee(nv, len(infos), bf)
goodFunds := big.Add(maxFee, big.Add(collateral, aggFee))
from, _, err := b.addrSel(b.mctx, mi, api.CommitAddr, goodFunds, collateral)
if err != nil {
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("no good address found: %w", err)
}
mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.ProveCommitAggregate, collateral, b.feeCfg.MaxCommitGasFee, enc.Bytes())
mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.ProveCommitAggregate, collateral, maxFee, enc.Bytes())
if err != nil {
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("sending message failed: %w", err)
}
@ -340,7 +365,7 @@ func (b *CommitBatcher) processSingle(mi miner.MinerInfo, sn abi.SectorNumber, i
enc := new(bytes.Buffer)
params := &miner.ProveCommitSectorParams{
SectorNumber: sn,
Proof: info.proof,
Proof: info.Proof,
}
if err := params.MarshalCBOR(enc); err != nil {
@ -352,14 +377,14 @@ func (b *CommitBatcher) processSingle(mi miner.MinerInfo, sn abi.SectorNumber, i
return cid.Undef, err
}
goodFunds := big.Add(collateral, b.feeCfg.MaxCommitGasFee)
goodFunds := big.Add(collateral, big.Int(b.feeCfg.MaxCommitGasFee))
from, _, err := b.addrSel(b.mctx, mi, api.CommitAddr, goodFunds, collateral)
if err != nil {
return cid.Undef, xerrors.Errorf("no good address to send commit message from: %w", err)
}
mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.ProveCommitSector, collateral, b.feeCfg.MaxCommitGasFee, enc.Bytes())
mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.ProveCommitSector, collateral, big.Int(b.feeCfg.MaxCommitGasFee), enc.Bytes())
if err != nil {
return cid.Undef, xerrors.Errorf("pushing message to mpool: %w", err)
}
@ -369,16 +394,15 @@ func (b *CommitBatcher) processSingle(mi miner.MinerInfo, sn abi.SectorNumber, i
// register commit, wait for batch message, return message CID
func (b *CommitBatcher) AddCommit(ctx context.Context, s SectorInfo, in AggregateInput) (res sealiface.CommitBatchRes, err error) {
_, curEpoch, err := b.api.ChainHead(b.mctx)
if err != nil {
log.Errorf("getting chain head: %s", err)
return sealiface.CommitBatchRes{}, nil
}
sn := s.SectorNumber
cu, err := b.getCommitCutoff(s)
if err != nil {
return sealiface.CommitBatchRes{}, err
}
b.lk.Lock()
b.deadlines[sn] = getSectorDeadline(curEpoch, s)
b.cutoffs[sn] = cu
b.todo[sn] = in
sent := make(chan sealiface.CommitBatchRes, 1)
@ -426,7 +450,7 @@ func (b *CommitBatcher) Pending(ctx context.Context) ([]abi.SectorID, error) {
for _, s := range b.todo {
res = append(res, abi.SectorID{
Miner: abi.ActorID(mid),
Number: s.info.Number,
Number: s.Info.Number,
})
}
@ -452,24 +476,43 @@ func (b *CommitBatcher) Stop(ctx context.Context) error {
}
}
func getSectorDeadline(curEpoch abi.ChainEpoch, si SectorInfo) time.Time {
deadlineEpoch := si.TicketEpoch + policy.MaxPreCommitRandomnessLookback
// TODO: If this returned epochs, it would make testing much easier
func (b *CommitBatcher) getCommitCutoff(si SectorInfo) (time.Time, error) {
tok, curEpoch, err := b.api.ChainHead(b.mctx)
if err != nil {
return time.Now(), xerrors.Errorf("getting chain head: %s", err)
}
nv, err := b.api.StateNetworkVersion(b.mctx, tok)
if err != nil {
log.Errorf("getting network version: %s", err)
return time.Now(), xerrors.Errorf("getting network version: %s", err)
}
pci, err := b.api.StateSectorPreCommitInfo(b.mctx, b.maddr, si.SectorNumber, tok)
if err != nil {
log.Errorf("getting precommit info: %s", err)
return time.Now(), err
}
cutoffEpoch := pci.PreCommitEpoch + policy.GetMaxProveCommitDuration(actors.VersionForNetwork(nv), si.SectorType)
for _, p := range si.Pieces {
if p.DealInfo == nil {
continue
}
startEpoch := p.DealInfo.DealSchedule.StartEpoch
if startEpoch < deadlineEpoch {
deadlineEpoch = startEpoch
if startEpoch < cutoffEpoch {
cutoffEpoch = startEpoch
}
}
if deadlineEpoch <= curEpoch {
return time.Now()
if cutoffEpoch <= curEpoch {
return time.Now(), nil
}
return time.Now().Add(time.Duration(deadlineEpoch-curEpoch) * time.Duration(build.BlockDelaySecs) * time.Second)
return time.Now().Add(time.Duration(cutoffEpoch-curEpoch) * time.Duration(build.BlockDelaySecs) * time.Second), nil
}
func (b *CommitBatcher) getSectorCollateral(sn abi.SectorNumber, tok TipSetToken) (abi.TokenAmount, error) {

View File

@ -0,0 +1,299 @@
package sealing_test
import (
"bytes"
"context"
"sort"
"sync"
"testing"
"time"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/network"
miner5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/miner"
proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/extern/storage-sealing/mocks"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
)
func TestCommitBatcher(t *testing.T) {
t0123, err := address.NewFromString("t0123")
require.NoError(t, err)
ctx := context.Background()
as := func(ctx context.Context, mi miner.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) {
return t0123, big.Zero(), nil
}
maxBatch := miner5.MaxAggregatedSectors
minBatch := miner5.MinAggregatedSectors
cfg := func() (sealiface.Config, error) {
return sealiface.Config{
MaxWaitDealsSectors: 2,
MaxSealingSectors: 0,
MaxSealingSectorsForDeals: 0,
WaitDealsDelay: time.Hour * 6,
AlwaysKeepUnsealedCopy: true,
BatchPreCommits: true,
MinPreCommitBatch: 1,
MaxPreCommitBatch: miner5.PreCommitSectorBatchMaxSize,
PreCommitBatchWait: 24 * time.Hour,
PreCommitBatchSlack: 3 * time.Hour,
AggregateCommits: true,
MinCommitBatch: minBatch,
MaxCommitBatch: maxBatch,
CommitBatchWait: 24 * time.Hour,
CommitBatchSlack: 1 * time.Hour,
TerminateBatchMin: 1,
TerminateBatchMax: 100,
TerminateBatchWait: 5 * time.Minute,
}, nil
}
type promise func(t *testing.T)
type action func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise
actions := func(as ...action) action {
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
var ps []promise
for _, a := range as {
p := a(t, s, pcb)
if p != nil {
ps = append(ps, p)
}
}
if len(ps) > 0 {
return func(t *testing.T) {
for _, p := range ps {
p(t)
}
}
}
return nil
}
}
addSector := func(sn abi.SectorNumber) action {
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
var pcres sealiface.CommitBatchRes
var pcerr error
done := sync.Mutex{}
done.Lock()
si := sealing.SectorInfo{
SectorNumber: sn,
}
s.EXPECT().ChainHead(gomock.Any()).Return(nil, abi.ChainEpoch(1), nil)
s.EXPECT().StateNetworkVersion(gomock.Any(), gomock.Any()).Return(network.Version13, nil)
s.EXPECT().StateSectorPreCommitInfo(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&miner.SectorPreCommitOnChainInfo{
PreCommitDeposit: big.Zero(),
}, nil)
go func() {
defer done.Unlock()
pcres, pcerr = pcb.AddCommit(ctx, si, sealing.AggregateInput{
Info: proof5.AggregateSealVerifyInfo{
Number: sn,
},
})
}()
return func(t *testing.T) {
done.Lock()
require.NoError(t, pcerr)
require.Empty(t, pcres.Error)
require.Contains(t, pcres.Sectors, si.SectorNumber)
}
}
}
addSectors := func(sectors []abi.SectorNumber) action {
as := make([]action, len(sectors))
for i, sector := range sectors {
as[i] = addSector(sector)
}
return actions(as...)
}
waitPending := func(n int) action {
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
require.Eventually(t, func() bool {
p, err := pcb.Pending(ctx)
require.NoError(t, err)
return len(p) == n
}, time.Second*5, 10*time.Millisecond)
return nil
}
}
expectSend := func(expect []abi.SectorNumber) action {
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
s.EXPECT().StateMinerInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(miner.MinerInfo{Owner: t0123, Worker: t0123}, nil)
ti := len(expect)
batch := false
if ti >= minBatch {
batch = true
ti = 1
}
s.EXPECT().ChainHead(gomock.Any()).Return(nil, abi.ChainEpoch(1), nil)
s.EXPECT().StateSectorPreCommitInfo(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&miner.SectorPreCommitOnChainInfo{
PreCommitDeposit: big.Zero(),
}, nil).Times(len(expect))
s.EXPECT().StateMinerInitialPledgeCollateral(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(big.Zero(), nil).Times(len(expect))
if batch {
s.EXPECT().StateNetworkVersion(gomock.Any(), gomock.Any()).Return(network.Version13, nil)
s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(big.NewInt(2000), nil)
}
s.EXPECT().SendMsg(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), funMatcher(func(i interface{}) bool {
b := i.([]byte)
if batch {
var params miner5.ProveCommitAggregateParams
require.NoError(t, params.UnmarshalCBOR(bytes.NewReader(b)))
for _, number := range expect {
set, err := params.SectorNumbers.IsSet(uint64(number))
require.NoError(t, err)
require.True(t, set)
}
} else {
var params miner5.ProveCommitSectorParams
require.NoError(t, params.UnmarshalCBOR(bytes.NewReader(b)))
}
return true
})).Times(ti)
return nil
}
}
flush := func(expect []abi.SectorNumber) action {
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
_ = expectSend(expect)(t, s, pcb)
batch := len(expect) >= minBatch
r, err := pcb.Flush(ctx)
require.NoError(t, err)
if batch {
require.Len(t, r, 1)
require.Empty(t, r[0].Error)
sort.Slice(r[0].Sectors, func(i, j int) bool {
return r[0].Sectors[i] < r[0].Sectors[j]
})
require.Equal(t, expect, r[0].Sectors)
} else {
require.Len(t, r, len(expect))
for _, res := range r {
require.Len(t, res.Sectors, 1)
require.Empty(t, res.Error)
}
sort.Slice(r, func(i, j int) bool {
return r[i].Sectors[0] < r[j].Sectors[0]
})
for i, res := range r {
require.Equal(t, abi.SectorNumber(i), res.Sectors[0])
}
}
return nil
}
}
getSectors := func(n int) []abi.SectorNumber {
out := make([]abi.SectorNumber, n)
for i := range out {
out[i] = abi.SectorNumber(i)
}
return out
}
tcs := map[string]struct {
actions []action
}{
"addSingle": {
actions: []action{
addSector(0),
waitPending(1),
flush([]abi.SectorNumber{0}),
},
},
"addTwo": {
actions: []action{
addSectors(getSectors(2)),
waitPending(2),
flush(getSectors(2)),
},
},
"addAte": {
actions: []action{
addSectors(getSectors(8)),
waitPending(8),
flush(getSectors(8)),
},
},
"addMax": {
actions: []action{
expectSend(getSectors(maxBatch)),
addSectors(getSectors(maxBatch)),
},
},
}
for name, tc := range tcs {
tc := tc
t.Run(name, func(t *testing.T) {
// create go mock controller here
mockCtrl := gomock.NewController(t)
// when test is done, assert expectations on all mock objects.
defer mockCtrl.Finish()
// create them mocks
pcapi := mocks.NewMockCommitBatcherApi(mockCtrl)
pcb := sealing.NewCommitBatcher(ctx, t0123, pcapi, as, fc, cfg, &fakeProver{})
var promises []promise
for _, a := range tc.actions {
p := a(t, pcapi, pcb)
if p != nil {
promises = append(promises, p)
}
}
for _, p := range promises {
p(t)
}
err := pcb.Stop(ctx)
require.NoError(t, err)
})
}
}
type fakeProver struct{}
func (f fakeProver) AggregateSealProofs(aggregateInfo proof5.AggregateSealVerifyProofAndInfos, proofs [][]byte) ([]byte, error) {
return []byte("Trust me, I'm a proof"), nil
}
var _ ffiwrapper.Prover = &fakeProver{}

View File

@ -103,6 +103,10 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
on(SectorChainPreCommitFailed{}, PreCommitFailed),
),
Committing: planCommitting,
CommitFinalize: planOne(
on(SectorFinalized{}, SubmitCommit),
on(SectorFinalizeFailed{}, CommitFinalizeFailed),
),
SubmitCommit: planOne(
on(SectorCommitSubmitted{}, CommitWait),
on(SectorSubmitCommitAggregate{}, SubmitCommitAggregate),
@ -151,6 +155,9 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
on(SectorRetryComputeProof{}, Committing),
on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed),
),
CommitFinalizeFailed: planOne(
on(SectorRetryFinalize{}, CommitFinalizeFailed),
),
CommitFailed: planOne(
on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed),
on(SectorRetryWaitSeed{}, WaitSeed),
@ -379,6 +386,8 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
fallthrough
case CommitWait:
return m.handleCommitWait, processed, nil
case CommitFinalize:
fallthrough
case FinalizeSector:
return m.handleFinalizeSector, processed, nil
@ -393,6 +402,8 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
return m.handleComputeProofFailed, processed, nil
case CommitFailed:
return m.handleCommitFailed, processed, nil
case CommitFinalizeFailed:
fallthrough
case FinalizeFailed:
return m.handleFinalizeFailed, processed, nil
case PackingFailed: // DEPRECATED: remove this for the next reset
@ -482,6 +493,9 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) (uint64, err
case SectorCommitted: // the normal case
e.apply(state)
state.State = SubmitCommit
case SectorProofReady: // early finalize
e.apply(state)
state.State = CommitFinalize
case SectorSeedReady: // seed changed :/
if e.SeedEpoch == state.SeedEpoch && bytes.Equal(e.SeedValue, state.SeedValue) {
log.Warnf("planCommitting: got SectorSeedReady, but the seed didn't change")
@ -508,6 +522,8 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) (uint64, err
}
func (m *Sealing) restartSectors(ctx context.Context) error {
defer m.startupWait.Done()
trackedSectors, err := m.ListSectors()
if err != nil {
log.Errorf("loading sector list: %+v", err)
@ -525,6 +541,7 @@ func (m *Sealing) restartSectors(ctx context.Context) error {
}
func (m *Sealing) ForceSectorState(ctx context.Context, id abi.SectorNumber, state SectorState) error {
m.startupWait.Wait()
return m.sectors.Send(id, SectorForceState{state})
}

View File

@ -245,6 +245,15 @@ func (evt SectorCommitted) apply(state *SectorInfo) {
state.Proof = evt.Proof
}
// like SectorCommitted, but finalizes before sending the proof to the chain
type SectorProofReady struct {
Proof []byte
}
func (evt SectorProofReady) apply(state *SectorInfo) {
state.Proof = evt.Proof
}
type SectorSubmitCommitAggregate struct{}
func (evt SectorSubmitCommitAggregate) apply(*SectorInfo) {}

View File

@ -87,6 +87,73 @@ func TestHappyPath(t *testing.T) {
}
}
func TestHappyPathFinalizeEarly(t *testing.T) {
var notif []struct{ before, after SectorInfo }
ma, _ := address.NewIDAddress(55151)
m := test{
s: &Sealing{
maddr: ma,
stats: SectorStats{
bySector: map[abi.SectorID]statSectorState{},
},
notifee: func(before, after SectorInfo) {
notif = append(notif, struct{ before, after SectorInfo }{before, after})
},
},
t: t,
state: &SectorInfo{State: Packing},
}
m.planSingle(SectorPacked{})
require.Equal(m.t, m.state.State, GetTicket)
m.planSingle(SectorTicket{})
require.Equal(m.t, m.state.State, PreCommit1)
m.planSingle(SectorPreCommit1{})
require.Equal(m.t, m.state.State, PreCommit2)
m.planSingle(SectorPreCommit2{})
require.Equal(m.t, m.state.State, PreCommitting)
m.planSingle(SectorPreCommitted{})
require.Equal(m.t, m.state.State, PreCommitWait)
m.planSingle(SectorPreCommitLanded{})
require.Equal(m.t, m.state.State, WaitSeed)
m.planSingle(SectorSeedReady{})
require.Equal(m.t, m.state.State, Committing)
m.planSingle(SectorProofReady{})
require.Equal(m.t, m.state.State, CommitFinalize)
m.planSingle(SectorFinalized{})
require.Equal(m.t, m.state.State, SubmitCommit)
m.planSingle(SectorSubmitCommitAggregate{})
require.Equal(m.t, m.state.State, SubmitCommitAggregate)
m.planSingle(SectorCommitAggregateSent{})
require.Equal(m.t, m.state.State, CommitWait)
m.planSingle(SectorProving{})
require.Equal(m.t, m.state.State, FinalizeSector)
m.planSingle(SectorFinalized{})
require.Equal(m.t, m.state.State, Proving)
expected := []SectorState{Packing, GetTicket, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, WaitSeed, Committing, CommitFinalize, SubmitCommit, SubmitCommitAggregate, CommitWait, FinalizeSector, Proving}
for i, n := range notif {
if n.before.State != expected[i] {
t.Fatalf("expected before state: %s, got: %s", expected[i], n.before.State)
}
if n.after.State != expected[i+1] {
t.Fatalf("expected after state: %s, got: %s", expected[i+1], n.after.State)
}
}
}
func TestSeedRevert(t *testing.T) {
ma, _ := address.NewIDAddress(55151)
m := test{

View File

@ -9,6 +9,8 @@ import (
)
func (m *Sealing) PledgeSector(ctx context.Context) (storage.SectorRef, error) {
m.startupWait.Wait()
m.inputLk.Lock()
defer m.inputLk.Unlock()

View File

@ -394,6 +394,7 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
}
func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSealProof) error {
m.startupWait.Wait()
if m.creating != nil {
return nil // new sector is being created right now
}
@ -446,7 +447,9 @@ func (m *Sealing) createSector(ctx context.Context, cfg sealiface.Config, sp abi
}
func (m *Sealing) StartPacking(sid abi.SectorNumber) error {
m.startupWait.Wait()
log.Infow("starting to seal deal sector", "sector", sid, "trigger", "user")
return m.sectors.Send(uint64(sid), SectorStartPacking{})
}

View File

@ -0,0 +1,149 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/filecoin-project/lotus/extern/storage-sealing (interfaces: CommitBatcherApi)
// Package mocks is a generated GoMock package.
package mocks
import (
context "context"
reflect "reflect"
address "github.com/filecoin-project/go-address"
abi "github.com/filecoin-project/go-state-types/abi"
big "github.com/filecoin-project/go-state-types/big"
network "github.com/filecoin-project/go-state-types/network"
miner "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
miner0 "github.com/filecoin-project/specs-actors/actors/builtin/miner"
gomock "github.com/golang/mock/gomock"
cid "github.com/ipfs/go-cid"
)
// MockCommitBatcherApi is a mock of CommitBatcherApi interface.
type MockCommitBatcherApi struct {
ctrl *gomock.Controller
recorder *MockCommitBatcherApiMockRecorder
}
// MockCommitBatcherApiMockRecorder is the mock recorder for MockCommitBatcherApi.
type MockCommitBatcherApiMockRecorder struct {
mock *MockCommitBatcherApi
}
// NewMockCommitBatcherApi creates a new mock instance.
func NewMockCommitBatcherApi(ctrl *gomock.Controller) *MockCommitBatcherApi {
mock := &MockCommitBatcherApi{ctrl: ctrl}
mock.recorder = &MockCommitBatcherApiMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockCommitBatcherApi) EXPECT() *MockCommitBatcherApiMockRecorder {
return m.recorder
}
// ChainBaseFee mocks base method.
func (m *MockCommitBatcherApi) ChainBaseFee(arg0 context.Context, arg1 sealing.TipSetToken) (big.Int, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ChainBaseFee", arg0, arg1)
ret0, _ := ret[0].(big.Int)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// ChainBaseFee indicates an expected call of ChainBaseFee.
func (mr *MockCommitBatcherApiMockRecorder) ChainBaseFee(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainBaseFee", reflect.TypeOf((*MockCommitBatcherApi)(nil).ChainBaseFee), arg0, arg1)
}
// ChainHead mocks base method.
func (m *MockCommitBatcherApi) ChainHead(arg0 context.Context) (sealing.TipSetToken, abi.ChainEpoch, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ChainHead", arg0)
ret0, _ := ret[0].(sealing.TipSetToken)
ret1, _ := ret[1].(abi.ChainEpoch)
ret2, _ := ret[2].(error)
return ret0, ret1, ret2
}
// ChainHead indicates an expected call of ChainHead.
func (mr *MockCommitBatcherApiMockRecorder) ChainHead(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainHead", reflect.TypeOf((*MockCommitBatcherApi)(nil).ChainHead), arg0)
}
// SendMsg mocks base method.
func (m *MockCommitBatcherApi) SendMsg(arg0 context.Context, arg1, arg2 address.Address, arg3 abi.MethodNum, arg4, arg5 big.Int, arg6 []byte) (cid.Cid, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SendMsg", arg0, arg1, arg2, arg3, arg4, arg5, arg6)
ret0, _ := ret[0].(cid.Cid)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// SendMsg indicates an expected call of SendMsg.
func (mr *MockCommitBatcherApiMockRecorder) SendMsg(arg0, arg1, arg2, arg3, arg4, arg5, arg6 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockCommitBatcherApi)(nil).SendMsg), arg0, arg1, arg2, arg3, arg4, arg5, arg6)
}
// StateMinerInfo mocks base method.
func (m *MockCommitBatcherApi) StateMinerInfo(arg0 context.Context, arg1 address.Address, arg2 sealing.TipSetToken) (miner.MinerInfo, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "StateMinerInfo", arg0, arg1, arg2)
ret0, _ := ret[0].(miner.MinerInfo)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// StateMinerInfo indicates an expected call of StateMinerInfo.
func (mr *MockCommitBatcherApiMockRecorder) StateMinerInfo(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateMinerInfo", reflect.TypeOf((*MockCommitBatcherApi)(nil).StateMinerInfo), arg0, arg1, arg2)
}
// StateMinerInitialPledgeCollateral mocks base method.
func (m *MockCommitBatcherApi) StateMinerInitialPledgeCollateral(arg0 context.Context, arg1 address.Address, arg2 miner0.SectorPreCommitInfo, arg3 sealing.TipSetToken) (big.Int, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "StateMinerInitialPledgeCollateral", arg0, arg1, arg2, arg3)
ret0, _ := ret[0].(big.Int)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// StateMinerInitialPledgeCollateral indicates an expected call of StateMinerInitialPledgeCollateral.
func (mr *MockCommitBatcherApiMockRecorder) StateMinerInitialPledgeCollateral(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateMinerInitialPledgeCollateral", reflect.TypeOf((*MockCommitBatcherApi)(nil).StateMinerInitialPledgeCollateral), arg0, arg1, arg2, arg3)
}
// StateNetworkVersion mocks base method.
func (m *MockCommitBatcherApi) StateNetworkVersion(arg0 context.Context, arg1 sealing.TipSetToken) (network.Version, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "StateNetworkVersion", arg0, arg1)
ret0, _ := ret[0].(network.Version)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// StateNetworkVersion indicates an expected call of StateNetworkVersion.
func (mr *MockCommitBatcherApiMockRecorder) StateNetworkVersion(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateNetworkVersion", reflect.TypeOf((*MockCommitBatcherApi)(nil).StateNetworkVersion), arg0, arg1)
}
// StateSectorPreCommitInfo mocks base method.
func (m *MockCommitBatcherApi) StateSectorPreCommitInfo(arg0 context.Context, arg1 address.Address, arg2 abi.SectorNumber, arg3 sealing.TipSetToken) (*miner.SectorPreCommitOnChainInfo, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "StateSectorPreCommitInfo", arg0, arg1, arg2, arg3)
ret0, _ := ret[0].(*miner.SectorPreCommitOnChainInfo)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// StateSectorPreCommitInfo indicates an expected call of StateSectorPreCommitInfo.
func (mr *MockCommitBatcherApiMockRecorder) StateSectorPreCommitInfo(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateSectorPreCommitInfo", reflect.TypeOf((*MockCommitBatcherApi)(nil).StateSectorPreCommitInfo), arg0, arg1, arg2, arg3)
}

View File

@ -0,0 +1,87 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/filecoin-project/lotus/extern/storage-sealing (interfaces: PreCommitBatcherApi)
// Package mocks is a generated GoMock package.
package mocks
import (
context "context"
reflect "reflect"
address "github.com/filecoin-project/go-address"
abi "github.com/filecoin-project/go-state-types/abi"
big "github.com/filecoin-project/go-state-types/big"
miner "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
gomock "github.com/golang/mock/gomock"
cid "github.com/ipfs/go-cid"
)
// MockPreCommitBatcherApi is a mock of PreCommitBatcherApi interface.
type MockPreCommitBatcherApi struct {
ctrl *gomock.Controller
recorder *MockPreCommitBatcherApiMockRecorder
}
// MockPreCommitBatcherApiMockRecorder is the mock recorder for MockPreCommitBatcherApi.
type MockPreCommitBatcherApiMockRecorder struct {
mock *MockPreCommitBatcherApi
}
// NewMockPreCommitBatcherApi creates a new mock instance.
func NewMockPreCommitBatcherApi(ctrl *gomock.Controller) *MockPreCommitBatcherApi {
mock := &MockPreCommitBatcherApi{ctrl: ctrl}
mock.recorder = &MockPreCommitBatcherApiMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockPreCommitBatcherApi) EXPECT() *MockPreCommitBatcherApiMockRecorder {
return m.recorder
}
// ChainHead mocks base method.
func (m *MockPreCommitBatcherApi) ChainHead(arg0 context.Context) (sealing.TipSetToken, abi.ChainEpoch, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ChainHead", arg0)
ret0, _ := ret[0].(sealing.TipSetToken)
ret1, _ := ret[1].(abi.ChainEpoch)
ret2, _ := ret[2].(error)
return ret0, ret1, ret2
}
// ChainHead indicates an expected call of ChainHead.
func (mr *MockPreCommitBatcherApiMockRecorder) ChainHead(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainHead", reflect.TypeOf((*MockPreCommitBatcherApi)(nil).ChainHead), arg0)
}
// SendMsg mocks base method.
func (m *MockPreCommitBatcherApi) SendMsg(arg0 context.Context, arg1, arg2 address.Address, arg3 abi.MethodNum, arg4, arg5 big.Int, arg6 []byte) (cid.Cid, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SendMsg", arg0, arg1, arg2, arg3, arg4, arg5, arg6)
ret0, _ := ret[0].(cid.Cid)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// SendMsg indicates an expected call of SendMsg.
func (mr *MockPreCommitBatcherApiMockRecorder) SendMsg(arg0, arg1, arg2, arg3, arg4, arg5, arg6 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockPreCommitBatcherApi)(nil).SendMsg), arg0, arg1, arg2, arg3, arg4, arg5, arg6)
}
// StateMinerInfo mocks base method.
func (m *MockPreCommitBatcherApi) StateMinerInfo(arg0 context.Context, arg1 address.Address, arg2 sealing.TipSetToken) (miner.MinerInfo, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "StateMinerInfo", arg0, arg1, arg2)
ret0, _ := ret[0].(miner.MinerInfo)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// StateMinerInfo indicates an expected call of StateMinerInfo.
func (mr *MockPreCommitBatcherApiMockRecorder) StateMinerInfo(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateMinerInfo", reflect.TypeOf((*MockPreCommitBatcherApi)(nil).StateMinerInfo), arg0, arg1, arg2)
}

View File

@ -7,6 +7,9 @@ import (
"sync"
"time"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
@ -19,8 +22,11 @@ import (
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
"github.com/filecoin-project/lotus/node/config"
)
//go:generate go run github.com/golang/mock/mockgen -destination=mocks/mock_precommit_batcher.go -package=mocks . PreCommitBatcherApi
type PreCommitBatcherApi interface {
SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error)
StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error)
@ -37,10 +43,10 @@ type PreCommitBatcher struct {
maddr address.Address
mctx context.Context
addrSel AddrSel
feeCfg FeeConfig
feeCfg config.MinerFeeConfig
getConfig GetSealingConfigFunc
deadlines map[abi.SectorNumber]time.Time
cutoffs map[abi.SectorNumber]time.Time
todo map[abi.SectorNumber]*preCommitEntry
waiting map[abi.SectorNumber][]chan sealiface.PreCommitBatchRes
@ -49,7 +55,7 @@ type PreCommitBatcher struct {
lk sync.Mutex
}
func NewPreCommitBatcher(mctx context.Context, maddr address.Address, api PreCommitBatcherApi, addrSel AddrSel, feeCfg FeeConfig, getConfig GetSealingConfigFunc) *PreCommitBatcher {
func NewPreCommitBatcher(mctx context.Context, maddr address.Address, api PreCommitBatcherApi, addrSel AddrSel, feeCfg config.MinerFeeConfig, getConfig GetSealingConfigFunc) *PreCommitBatcher {
b := &PreCommitBatcher{
api: api,
maddr: maddr,
@ -58,7 +64,7 @@ func NewPreCommitBatcher(mctx context.Context, maddr address.Address, api PreCom
feeCfg: feeCfg,
getConfig: getConfig,
deadlines: map[abi.SectorNumber]time.Time{},
cutoffs: map[abi.SectorNumber]time.Time{},
todo: map[abi.SectorNumber]*preCommitEntry{},
waiting: map[abi.SectorNumber][]chan sealiface.PreCommitBatchRes{},
@ -120,30 +126,30 @@ func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.T
return nil
}
var deadline time.Time
var cutoff time.Time
for sn := range b.todo {
sectorDeadline := b.deadlines[sn]
if deadline.IsZero() || (!sectorDeadline.IsZero() && sectorDeadline.Before(deadline)) {
deadline = sectorDeadline
sectorCutoff := b.cutoffs[sn]
if cutoff.IsZero() || (!sectorCutoff.IsZero() && sectorCutoff.Before(cutoff)) {
cutoff = sectorCutoff
}
}
for sn := range b.waiting {
sectorDeadline := b.deadlines[sn]
if deadline.IsZero() || (!sectorDeadline.IsZero() && sectorDeadline.Before(deadline)) {
deadline = sectorDeadline
sectorCutoff := b.cutoffs[sn]
if cutoff.IsZero() || (!sectorCutoff.IsZero() && sectorCutoff.Before(cutoff)) {
cutoff = sectorCutoff
}
}
if deadline.IsZero() {
if cutoff.IsZero() {
return time.After(maxWait)
}
deadline = deadline.Add(-slack)
if deadline.Before(now) {
cutoff = cutoff.Add(-slack)
if cutoff.Before(now) {
return time.After(time.Nanosecond) // can't return 0
}
wait := deadline.Sub(now)
wait := cutoff.Sub(now)
if wait > maxWait {
wait = maxWait
}
@ -191,7 +197,7 @@ func (b *PreCommitBatcher) maybeStartBatch(notif, after bool) ([]sealiface.PreCo
delete(b.waiting, sn)
delete(b.todo, sn)
delete(b.deadlines, sn)
delete(b.cutoffs, sn)
}
}
@ -224,21 +230,22 @@ func (b *PreCommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.PreCo
return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("couldn't get miner info: %w", err)
}
goodFunds := big.Add(deposit, b.feeCfg.MaxPreCommitGasFee)
maxFee := b.feeCfg.MaxPreCommitBatchGasFee.FeeForSectors(len(params.Sectors))
goodFunds := big.Add(deposit, maxFee)
from, _, err := b.addrSel(b.mctx, mi, api.PreCommitAddr, goodFunds, deposit)
if err != nil {
return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("no good address found: %w", err)
}
mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.PreCommitSectorBatch, deposit, b.feeCfg.MaxPreCommitGasFee, enc.Bytes())
mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.PreCommitSectorBatch, deposit, maxFee, enc.Bytes())
if err != nil {
return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("sending message failed: %w", err)
}
res.Msg = &mcid
log.Infow("Sent ProveCommitAggregate message", "cid", mcid, "from", from, "sectors", len(b.todo))
log.Infow("Sent PreCommitSectorBatch message", "cid", mcid, "from", from, "sectors", len(b.todo))
return []sealiface.PreCommitBatchRes{res}, nil
}
@ -254,7 +261,7 @@ func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, depos
sn := s.SectorNumber
b.lk.Lock()
b.deadlines[sn] = getSectorDeadline(curEpoch, s)
b.cutoffs[sn] = getPreCommitCutoff(curEpoch, s)
b.todo[sn] = &preCommitEntry{
deposit: deposit,
pci: in,
@ -330,3 +337,24 @@ func (b *PreCommitBatcher) Stop(ctx context.Context) error {
return ctx.Err()
}
}
// TODO: If this returned epochs, it would make testing much easier
func getPreCommitCutoff(curEpoch abi.ChainEpoch, si SectorInfo) time.Time {
cutoffEpoch := si.TicketEpoch + policy.MaxPreCommitRandomnessLookback
for _, p := range si.Pieces {
if p.DealInfo == nil {
continue
}
startEpoch := p.DealInfo.DealSchedule.StartEpoch
if startEpoch < cutoffEpoch {
cutoffEpoch = startEpoch
}
}
if cutoffEpoch <= curEpoch {
return time.Now()
}
return time.Now().Add(time.Duration(cutoffEpoch-curEpoch) * time.Duration(build.BlockDelaySecs) * time.Second)
}

View File

@ -0,0 +1,258 @@
package sealing_test
import (
"bytes"
"context"
"sort"
"sync"
"testing"
"time"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
miner5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/miner"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/types"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/extern/storage-sealing/mocks"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
"github.com/filecoin-project/lotus/node/config"
miner0 "github.com/filecoin-project/specs-actors/actors/builtin/miner"
)
var fc = config.MinerFeeConfig{
MaxPreCommitGasFee: types.FIL(types.FromFil(1)),
MaxCommitGasFee: types.FIL(types.FromFil(1)),
MaxTerminateGasFee: types.FIL(types.FromFil(1)),
MaxPreCommitBatchGasFee: config.BatchFeeConfig{Base: types.FIL(types.FromFil(3)), PerSector: types.FIL(types.FromFil(1))},
MaxCommitBatchGasFee: config.BatchFeeConfig{Base: types.FIL(types.FromFil(3)), PerSector: types.FIL(types.FromFil(1))},
}
func TestPrecommitBatcher(t *testing.T) {
t0123, err := address.NewFromString("t0123")
require.NoError(t, err)
ctx := context.Background()
as := func(ctx context.Context, mi miner.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) {
return t0123, big.Zero(), nil
}
maxBatch := miner5.PreCommitSectorBatchMaxSize
cfg := func() (sealiface.Config, error) {
return sealiface.Config{
MaxWaitDealsSectors: 2,
MaxSealingSectors: 0,
MaxSealingSectorsForDeals: 0,
WaitDealsDelay: time.Hour * 6,
AlwaysKeepUnsealedCopy: true,
BatchPreCommits: true,
MinPreCommitBatch: 1,
MaxPreCommitBatch: maxBatch,
PreCommitBatchWait: 24 * time.Hour,
PreCommitBatchSlack: 3 * time.Hour,
AggregateCommits: true,
MinCommitBatch: miner5.MinAggregatedSectors,
MaxCommitBatch: miner5.MaxAggregatedSectors,
CommitBatchWait: 24 * time.Hour,
CommitBatchSlack: 1 * time.Hour,
TerminateBatchMin: 1,
TerminateBatchMax: 100,
TerminateBatchWait: 5 * time.Minute,
}, nil
}
type promise func(t *testing.T)
type action func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *sealing.PreCommitBatcher) promise
actions := func(as ...action) action {
return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *sealing.PreCommitBatcher) promise {
var ps []promise
for _, a := range as {
p := a(t, s, pcb)
if p != nil {
ps = append(ps, p)
}
}
if len(ps) > 0 {
return func(t *testing.T) {
for _, p := range ps {
p(t)
}
}
}
return nil
}
}
addSector := func(sn abi.SectorNumber) action {
return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *sealing.PreCommitBatcher) promise {
var pcres sealiface.PreCommitBatchRes
var pcerr error
done := sync.Mutex{}
done.Lock()
si := sealing.SectorInfo{
SectorNumber: sn,
}
s.EXPECT().ChainHead(gomock.Any()).Return(nil, abi.ChainEpoch(1), nil)
go func() {
defer done.Unlock()
pcres, pcerr = pcb.AddPreCommit(ctx, si, big.Zero(), &miner0.SectorPreCommitInfo{
SectorNumber: si.SectorNumber,
SealedCID: fakePieceCid(t),
DealIDs: nil,
Expiration: 0,
})
}()
return func(t *testing.T) {
done.Lock()
require.NoError(t, pcerr)
require.Empty(t, pcres.Error)
require.Contains(t, pcres.Sectors, si.SectorNumber)
}
}
}
addSectors := func(sectors []abi.SectorNumber) action {
as := make([]action, len(sectors))
for i, sector := range sectors {
as[i] = addSector(sector)
}
return actions(as...)
}
waitPending := func(n int) action {
return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *sealing.PreCommitBatcher) promise {
require.Eventually(t, func() bool {
p, err := pcb.Pending(ctx)
require.NoError(t, err)
return len(p) == n
}, time.Second*5, 10*time.Millisecond)
return nil
}
}
expectSend := func(expect []abi.SectorNumber) action {
return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *sealing.PreCommitBatcher) promise {
s.EXPECT().StateMinerInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(miner.MinerInfo{Owner: t0123, Worker: t0123}, nil)
s.EXPECT().SendMsg(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), funMatcher(func(i interface{}) bool {
b := i.([]byte)
var params miner5.PreCommitSectorBatchParams
require.NoError(t, params.UnmarshalCBOR(bytes.NewReader(b)))
for s, number := range expect {
require.Equal(t, number, params.Sectors[s].SectorNumber)
}
return true
}))
return nil
}
}
flush := func(expect []abi.SectorNumber) action {
return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *sealing.PreCommitBatcher) promise {
_ = expectSend(expect)(t, s, pcb)
r, err := pcb.Flush(ctx)
require.NoError(t, err)
require.Len(t, r, 1)
require.Empty(t, r[0].Error)
sort.Slice(r[0].Sectors, func(i, j int) bool {
return r[0].Sectors[i] < r[0].Sectors[j]
})
require.Equal(t, expect, r[0].Sectors)
return nil
}
}
getSectors := func(n int) []abi.SectorNumber {
out := make([]abi.SectorNumber, n)
for i := range out {
out[i] = abi.SectorNumber(i)
}
return out
}
tcs := map[string]struct {
actions []action
}{
"addSingle": {
actions: []action{
addSector(0),
waitPending(1),
flush([]abi.SectorNumber{0}),
},
},
"addTwo": {
actions: []action{
addSectors(getSectors(2)),
waitPending(2),
flush(getSectors(2)),
},
},
"addMax": {
actions: []action{
expectSend(getSectors(maxBatch)),
addSectors(getSectors(maxBatch)),
},
},
}
for name, tc := range tcs {
tc := tc
t.Run(name, func(t *testing.T) {
// create go mock controller here
mockCtrl := gomock.NewController(t)
// when test is done, assert expectations on all mock objects.
defer mockCtrl.Finish()
// create them mocks
pcapi := mocks.NewMockPreCommitBatcherApi(mockCtrl)
pcb := sealing.NewPreCommitBatcher(ctx, t0123, pcapi, as, fc, cfg)
var promises []promise
for _, a := range tc.actions {
p := a(t, pcapi, pcb)
if p != nil {
promises = append(promises, p)
}
}
for _, p := range promises {
p(t)
}
err := pcb.Stop(ctx)
require.NoError(t, err)
})
}
}
type funMatcher func(interface{}) bool
func (funMatcher) Matches(interface{}) bool {
return true
}
func (funMatcher) String() string {
return "fun"
}

View File

@ -18,6 +18,8 @@ type Config struct {
AlwaysKeepUnsealedCopy bool
FinalizeEarly bool
BatchPreCommits bool
MaxPreCommitBatch int
MinPreCommitBatch int

View File

@ -28,6 +28,7 @@ import (
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
"github.com/filecoin-project/lotus/node/config"
)
const SectorStorePrefix = "/sectors"
@ -66,6 +67,7 @@ type SealingAPI interface {
StateMinerPartitions(ctx context.Context, m address.Address, dlIdx uint64, tok TipSetToken) ([]api.Partition, error)
SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error)
ChainHead(ctx context.Context) (TipSetToken, abi.ChainEpoch, error)
ChainBaseFee(context.Context, TipSetToken) (abi.TokenAmount, error)
ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.Message, error)
ChainGetRandomnessFromBeacon(ctx context.Context, tok TipSetToken, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte) (abi.Randomness, error)
ChainGetRandomnessFromTickets(ctx context.Context, tok TipSetToken, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte) (abi.Randomness, error)
@ -78,9 +80,11 @@ type AddrSel func(ctx context.Context, mi miner.MinerInfo, use api.AddrUse, good
type Sealing struct {
api SealingAPI
feeCfg FeeConfig
feeCfg config.MinerFeeConfig
events Events
startupWait sync.WaitGroup
maddr address.Address
sealer sectorstorage.SectorManager
@ -112,12 +116,6 @@ type Sealing struct {
dealInfo *CurrentDealInfoManager
}
type FeeConfig struct {
MaxPreCommitGasFee abi.TokenAmount
MaxCommitGasFee abi.TokenAmount
MaxTerminateGasFee abi.TokenAmount
}
type openSector struct {
used abi.UnpaddedPieceSize // change to bitfield/rle when AddPiece gains offset support to better fill sectors
@ -134,7 +132,7 @@ type pendingPiece struct {
accepted func(abi.SectorNumber, abi.UnpaddedPieceSize, error)
}
func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, prov ffiwrapper.Prover, pcp PreCommitPolicy, gc GetSealingConfigFunc, notifee SectorStateNotifee, as AddrSel) *Sealing {
func New(api SealingAPI, fc config.MinerFeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, prov ffiwrapper.Prover, pcp PreCommitPolicy, gc GetSealingConfigFunc, notifee SectorStateNotifee, as AddrSel) *Sealing {
s := &Sealing{
api: api,
feeCfg: fc,
@ -166,6 +164,7 @@ func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds
bySector: map[abi.SectorID]statSectorState{},
},
}
s.startupWait.Add(1)
s.sectors = statemachine.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), s, SectorInfo{})
@ -193,10 +192,14 @@ func (m *Sealing) Stop(ctx context.Context) error {
}
func (m *Sealing) Remove(ctx context.Context, sid abi.SectorNumber) error {
m.startupWait.Wait()
return m.sectors.Send(uint64(sid), SectorRemove{})
}
func (m *Sealing) Terminate(ctx context.Context, sid abi.SectorNumber) error {
m.startupWait.Wait()
return m.sectors.Send(uint64(sid), SectorTerminate{})
}

View File

@ -17,6 +17,8 @@ var ExistSectorStateList = map[SectorState]struct{}{
PreCommitBatchWait: {},
WaitSeed: {},
Committing: {},
CommitFinalize: {},
CommitFinalizeFailed: {},
SubmitCommit: {},
CommitWait: {},
SubmitCommitAggregate: {},
@ -65,6 +67,8 @@ const (
WaitSeed SectorState = "WaitSeed" // waiting for seed
Committing SectorState = "Committing" // compute PoRep
CommitFinalize SectorState = "CommitFinalize" // cleanup sector metadata before submitting the proof (early finalize)
CommitFinalizeFailed SectorState = "CommitFinalizeFailed"
// single commit
SubmitCommit SectorState = "SubmitCommit" // send commit message to the chain
@ -106,7 +110,7 @@ func toStatState(st SectorState) statSectorState {
switch st {
case UndefinedSectorState, Empty, WaitDeals, AddPiece:
return sstStaging
case Packing, GetTicket, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, SubmitPreCommitBatch, PreCommitBatchWait, WaitSeed, Committing, SubmitCommit, CommitWait, SubmitCommitAggregate, CommitAggregateWait, FinalizeSector:
case Packing, GetTicket, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, SubmitPreCommitBatch, PreCommitBatchWait, WaitSeed, Committing, CommitFinalize, SubmitCommit, CommitWait, SubmitCommitAggregate, CommitAggregateWait, FinalizeSector:
return sstSealing
case Proving, Removed, Removing, Terminating, TerminateWait, TerminateFinality, TerminateFailed:
return sstProving

View File

@ -334,7 +334,7 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf
return nil
}
goodFunds := big.Add(deposit, m.feeCfg.MaxPreCommitGasFee)
goodFunds := big.Add(deposit, big.Int(m.feeCfg.MaxPreCommitGasFee))
from, _, err := m.addrSel(ctx.Context(), mi, api.PreCommitAddr, goodFunds, deposit)
if err != nil {
@ -342,7 +342,7 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf
}
log.Infof("submitting precommit for sector %d (deposit: %s): ", sector.SectorNumber, deposit)
mcid, err := m.api.SendMsg(ctx.Context(), from, m.maddr, miner.Methods.PreCommitSector, deposit, m.feeCfg.MaxPreCommitGasFee, enc.Bytes())
mcid, err := m.api.SendMsg(ctx.Context(), from, m.maddr, miner.Methods.PreCommitSector, deposit, big.Int(m.feeCfg.MaxPreCommitGasFee), enc.Bytes())
if err != nil {
if params.ReplaceCapacity {
m.remarkForUpgrade(params.ReplaceSectorNumber)
@ -478,6 +478,11 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo)
}
}
cfg, err := m.getConfig()
if err != nil {
return xerrors.Errorf("getting config: %w", err)
}
log.Info("scheduling seal proof computation...")
log.Infof("KOMIT %d %x(%d); %x(%d); %v; r:%x; d:%x", sector.SectorNumber, sector.TicketValue, sector.TicketEpoch, sector.SeedValue, sector.SeedEpoch, sector.pieceInfos(), sector.CommR, sector.CommD)
@ -500,6 +505,24 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo)
return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed(2): %w", err)})
}
{
tok, _, err := m.api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handleCommitting: api error, not proceeding: %+v", err)
return nil
}
if err := m.checkCommit(ctx.Context(), sector, proof, tok); err != nil {
return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("commit check error: %w", err)})
}
}
if cfg.FinalizeEarly {
return ctx.Send(SectorProofReady{
Proof: proof,
})
}
return ctx.Send(SectorCommitted{
Proof: proof,
})
@ -524,7 +547,7 @@ func (m *Sealing) handleSubmitCommit(ctx statemachine.Context, sector SectorInfo
tok, _, err := m.api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handleCommitting: api error, not proceeding: %+v", err)
log.Errorf("handleSubmitCommit: api error, not proceeding: %+v", err)
return nil
}
@ -566,7 +589,7 @@ func (m *Sealing) handleSubmitCommit(ctx statemachine.Context, sector SectorInfo
collateral = big.Zero()
}
goodFunds := big.Add(collateral, m.feeCfg.MaxCommitGasFee)
goodFunds := big.Add(collateral, big.Int(m.feeCfg.MaxCommitGasFee))
from, _, err := m.addrSel(ctx.Context(), mi, api.CommitAddr, goodFunds, collateral)
if err != nil {
@ -574,7 +597,7 @@ func (m *Sealing) handleSubmitCommit(ctx statemachine.Context, sector SectorInfo
}
// TODO: check seed / ticket / deals are up to date
mcid, err := m.api.SendMsg(ctx.Context(), from, m.maddr, miner.Methods.ProveCommitSector, collateral, m.feeCfg.MaxCommitGasFee, enc.Bytes())
mcid, err := m.api.SendMsg(ctx.Context(), from, m.maddr, miner.Methods.ProveCommitSector, collateral, big.Int(m.feeCfg.MaxCommitGasFee), enc.Bytes())
if err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)})
}
@ -590,15 +613,15 @@ func (m *Sealing) handleSubmitCommitAggregate(ctx statemachine.Context, sector S
}
res, err := m.commiter.AddCommit(ctx.Context(), sector, AggregateInput{
info: proof.AggregateSealVerifyInfo{
Info: proof.AggregateSealVerifyInfo{
Number: sector.SectorNumber,
Randomness: sector.TicketValue,
InteractiveRandomness: sector.SeedValue,
SealedCID: *sector.CommR,
UnsealedCID: *sector.CommD,
},
proof: sector.Proof, // todo: this correct??
spt: sector.SectorType,
Proof: sector.Proof, // todo: this correct??
Spt: sector.SectorType,
})
if err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("queuing commit for aggregation failed: %w", err)})

View File

@ -19,6 +19,7 @@ import (
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/node/config"
)
type TerminateBatcherApi interface {
@ -34,7 +35,7 @@ type TerminateBatcher struct {
maddr address.Address
mctx context.Context
addrSel AddrSel
feeCfg FeeConfig
feeCfg config.MinerFeeConfig
getConfig GetSealingConfigFunc
todo map[SectorLocation]*bitfield.BitField // MinerSectorLocation -> BitField
@ -46,7 +47,7 @@ type TerminateBatcher struct {
lk sync.Mutex
}
func NewTerminationBatcher(mctx context.Context, maddr address.Address, api TerminateBatcherApi, addrSel AddrSel, feeCfg FeeConfig, getConfig GetSealingConfigFunc) *TerminateBatcher {
func NewTerminationBatcher(mctx context.Context, maddr address.Address, api TerminateBatcherApi, addrSel AddrSel, feeCfg config.MinerFeeConfig, getConfig GetSealingConfigFunc) *TerminateBatcher {
b := &TerminateBatcher{
api: api,
maddr: maddr,
@ -214,12 +215,12 @@ func (b *TerminateBatcher) processBatch(notif, after bool) (*cid.Cid, error) {
return nil, xerrors.Errorf("couldn't get miner info: %w", err)
}
from, _, err := b.addrSel(b.mctx, mi, api.TerminateSectorsAddr, b.feeCfg.MaxTerminateGasFee, b.feeCfg.MaxTerminateGasFee)
from, _, err := b.addrSel(b.mctx, mi, api.TerminateSectorsAddr, big.Int(b.feeCfg.MaxTerminateGasFee), big.Int(b.feeCfg.MaxTerminateGasFee))
if err != nil {
return nil, xerrors.Errorf("no good address found: %w", err)
}
mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.TerminateSectors, big.Zero(), b.feeCfg.MaxTerminateGasFee, enc.Bytes())
mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.TerminateSectors, big.Zero(), big.Int(b.feeCfg.MaxTerminateGasFee), enc.Bytes())
if err != nil {
return nil, xerrors.Errorf("sending message failed: %w", err)
}

4
go.mod
View File

@ -39,7 +39,7 @@ require (
github.com/filecoin-project/go-jsonrpc v0.1.4-0.20210217175800-45ea43ac2bec
github.com/filecoin-project/go-multistore v0.0.3
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20
github.com/filecoin-project/go-paramfetch v0.0.2-0.20210330140417-936748d3f5ec
github.com/filecoin-project/go-paramfetch v0.0.2-0.20210614165157-25a6c7769498
github.com/filecoin-project/go-state-types v0.1.1-0.20210506134452-99b279731c48
github.com/filecoin-project/go-statemachine v0.0.0-20200925024713-05bd7c71fbfe
github.com/filecoin-project/go-statestore v0.1.1
@ -48,7 +48,7 @@ require (
github.com/filecoin-project/specs-actors/v2 v2.3.5
github.com/filecoin-project/specs-actors/v3 v3.1.1
github.com/filecoin-project/specs-actors/v4 v4.0.1
github.com/filecoin-project/specs-actors/v5 v5.0.0-20210602024058-0c296bb386bf
github.com/filecoin-project/specs-actors/v5 v5.0.0-20210609212542-73e0409ac77c
github.com/filecoin-project/specs-storage v0.1.1-0.20201105051918-5188d9774506
github.com/filecoin-project/test-vectors/schema v0.0.5
github.com/gbrlsnchs/jwt/v3 v3.0.0-beta.1

10
go.sum
View File

@ -298,8 +298,8 @@ github.com/filecoin-project/go-multistore v0.0.3 h1:vaRBY4YiA2UZFPK57RNuewypB8u0
github.com/filecoin-project/go-multistore v0.0.3/go.mod h1:kaNqCC4IhU4B1uyr7YWFHd23TL4KM32aChS0jNkyUvQ=
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20 h1:+/4aUeUoKr6AKfPE3mBhXA5spIV6UcKdTYDPNU2Tdmg=
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20/go.mod h1:mPn+LRRd5gEKNAtc+r3ScpW2JRU/pj4NBKdADYWHiak=
github.com/filecoin-project/go-paramfetch v0.0.2-0.20210330140417-936748d3f5ec h1:gExwWUiT1TcARkxGneS4nvp9C+wBsKU0bFdg7qFpNco=
github.com/filecoin-project/go-paramfetch v0.0.2-0.20210330140417-936748d3f5ec/go.mod h1:fZzmf4tftbwf9S37XRifoJlz7nCjRdIrMGLR07dKLCc=
github.com/filecoin-project/go-paramfetch v0.0.2-0.20210614165157-25a6c7769498 h1:G10ezOvpH1CLXQ19EA9VWNwyL0mg536ujSayjV0yg0k=
github.com/filecoin-project/go-paramfetch v0.0.2-0.20210614165157-25a6c7769498/go.mod h1:1FH85P8U+DUEmWk1Jkw3Bw7FrwTVUNHk/95PSPG+dts=
github.com/filecoin-project/go-state-types v0.0.0-20200903145444-247639ffa6ad/go.mod h1:IQ0MBPnonv35CJHtWSN3YY1Hz2gkPru1Q9qoaYLxx9I=
github.com/filecoin-project/go-state-types v0.0.0-20200904021452-1883f36ca2f4/go.mod h1:IQ0MBPnonv35CJHtWSN3YY1Hz2gkPru1Q9qoaYLxx9I=
github.com/filecoin-project/go-state-types v0.0.0-20200928172055-2df22083d8ab/go.mod h1:ezYnPf0bNkTsDibL/psSz5dy4B5awOJ/E7P2Saeep8g=
@ -331,8 +331,8 @@ github.com/filecoin-project/specs-actors/v4 v4.0.0/go.mod h1:TkHXf/l7Wyw4ZejyXIP
github.com/filecoin-project/specs-actors/v4 v4.0.1 h1:AiWrtvJZ63MHGe6rn7tPu4nSUY8bA1KDNszqJaD5+Fg=
github.com/filecoin-project/specs-actors/v4 v4.0.1/go.mod h1:TkHXf/l7Wyw4ZejyXIPS2rK8bBO0rdwhTZyQQgaglng=
github.com/filecoin-project/specs-actors/v5 v5.0.0-20210512015452-4fe3889fff57/go.mod h1:283yBMMUSDB2abcjP/hhrwTkhb9h3sfM6KGrep/ZlBI=
github.com/filecoin-project/specs-actors/v5 v5.0.0-20210602024058-0c296bb386bf h1:xt9A1omyhSDbQvpVk7Na1J15a/n8y0y4GQDLeiWLpFs=
github.com/filecoin-project/specs-actors/v5 v5.0.0-20210602024058-0c296bb386bf/go.mod h1:b/btpRl84Q9SeDKlyIoORBQwe2OTmq14POrYrVvBWCM=
github.com/filecoin-project/specs-actors/v5 v5.0.0-20210609212542-73e0409ac77c h1:GnDJ6q3QEm2ytTKjPFQSvczAltgCSb3j9F1FeynwvPA=
github.com/filecoin-project/specs-actors/v5 v5.0.0-20210609212542-73e0409ac77c/go.mod h1:b/btpRl84Q9SeDKlyIoORBQwe2OTmq14POrYrVvBWCM=
github.com/filecoin-project/specs-storage v0.1.1-0.20201105051918-5188d9774506 h1:Ur/l2+6qN+lQiqjozWWc5p9UDaAMDZKTlDS98oRnlIw=
github.com/filecoin-project/specs-storage v0.1.1-0.20201105051918-5188d9774506/go.mod h1:nJRRM7Aa9XVvygr3W9k6xGF46RWzr2zxF/iGoAIfA/g=
github.com/filecoin-project/test-vectors/schema v0.0.5 h1:w3zHQhzM4pYxJDl21avXjOKBLF8egrvwUwjpT8TquDg=
@ -705,6 +705,8 @@ github.com/ipfs/go-log/v2 v2.1.2-0.20200626104915-0016c0b4b3e4/go.mod h1:2v2nsGf
github.com/ipfs/go-log/v2 v2.1.2/go.mod h1:2v2nsGfZsvvAJz13SyFzf9ObaqwHiHxsPLEHntrv9KM=
github.com/ipfs/go-log/v2 v2.1.3 h1:1iS3IU7aXRlbgUpN8yTTpJ53NXYjAe37vcI5+5nYrzk=
github.com/ipfs/go-log/v2 v2.1.3/go.mod h1:/8d0SH3Su5Ooc31QlL1WysJhvyOTDCjcCZ9Axpmri6g=
github.com/ipfs/go-log/v2 v2.1.3 h1:1iS3IU7aXRlbgUpN8yTTpJ53NXYjAe37vcI5+5nYrzk=
github.com/ipfs/go-log/v2 v2.1.3/go.mod h1:/8d0SH3Su5Ooc31QlL1WysJhvyOTDCjcCZ9Axpmri6g=
github.com/ipfs/go-merkledag v0.0.3/go.mod h1:Oc5kIXLHokkE1hWGMBHw+oxehkAaTOqtEb7Zbh6BhLA=
github.com/ipfs/go-merkledag v0.0.6/go.mod h1:QYPdnlvkOg7GnQRofu9XZimC5ZW5Wi3bKys/4GQQfto=
github.com/ipfs/go-merkledag v0.2.3/go.mod h1:SQiXrtSts3KGNmgOzMICy5c0POOpUNQLvB3ClKnBAlk=

View File

@ -3,6 +3,7 @@ package kit
import (
"fmt"
"os"
"strings"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/build"
@ -11,6 +12,11 @@ import (
)
func init() {
bin := os.Args[0]
if !strings.HasSuffix(bin, ".test") {
panic("package itests/kit must only be imported from tests")
}
_ = logging.SetLogLevel("*", "INFO")
policy.SetConsensusMinerMinPower(abi.NewStoragePower(2048))
@ -22,4 +28,5 @@ func init() {
panic(fmt.Sprintf("failed to set BELLMAN_NO_GPU env variable: %s", err))
}
build.InsecurePoStValidation = true
}

View File

@ -38,6 +38,7 @@ var (
MessageTo, _ = tag.NewKey("message_to")
MessageNonce, _ = tag.NewKey("message_nonce")
ReceivedFrom, _ = tag.NewKey("received_from")
MsgValid, _ = tag.NewKey("message_valid")
Endpoint, _ = tag.NewKey("endpoint")
APIInterface, _ = tag.NewKey("api") // to distinguish between gateway api and full node api endpoint calls
@ -61,6 +62,12 @@ var (
MessageReceived = stats.Int64("message/received", "Counter for total received messages", stats.UnitDimensionless)
MessageValidationFailure = stats.Int64("message/failure", "Counter for message validation failures", stats.UnitDimensionless)
MessageValidationSuccess = stats.Int64("message/success", "Counter for message validation successes", stats.UnitDimensionless)
MessageValidationDuration = stats.Float64("message/validation_ms", "Duration of message validation", stats.UnitMilliseconds)
MpoolGetNonceDuration = stats.Float64("mpool/getnonce_ms", "Duration of getStateNonce in mpool", stats.UnitMilliseconds)
MpoolGetBalanceDuration = stats.Float64("mpool/getbalance_ms", "Duration of getStateBalance in mpool", stats.UnitMilliseconds)
MpoolAddTsDuration = stats.Float64("mpool/addts_ms", "Duration of addTs in mpool", stats.UnitMilliseconds)
MpoolAddDuration = stats.Float64("mpool/add_ms", "Duration of Add in mpool", stats.UnitMilliseconds)
MpoolPushDuration = stats.Float64("mpool/push_ms", "Duration of Push in mpool", stats.UnitMilliseconds)
BlockPublished = stats.Int64("block/published", "Counter for total locally published blocks", stats.UnitDimensionless)
BlockReceived = stats.Int64("block/received", "Counter for total received blocks", stats.UnitDimensionless)
BlockValidationFailure = stats.Int64("block/failure", "Counter for block validation failures", stats.UnitDimensionless)
@ -170,6 +177,31 @@ var (
Measure: MessageValidationSuccess,
Aggregation: view.Count(),
}
MessageValidationDurationView = &view.View{
Measure: MessageValidationDuration,
Aggregation: defaultMillisecondsDistribution,
TagKeys: []tag.Key{MsgValid, Local},
}
MpoolGetNonceDurationView = &view.View{
Measure: MpoolGetNonceDuration,
Aggregation: defaultMillisecondsDistribution,
}
MpoolGetBalanceDurationView = &view.View{
Measure: MpoolGetBalanceDuration,
Aggregation: defaultMillisecondsDistribution,
}
MpoolAddTsDurationView = &view.View{
Measure: MpoolAddTsDuration,
Aggregation: defaultMillisecondsDistribution,
}
MpoolAddDurationView = &view.View{
Measure: MpoolAddDuration,
Aggregation: defaultMillisecondsDistribution,
}
MpoolPushDurationView = &view.View{
Measure: MpoolPushDuration,
Aggregation: defaultMillisecondsDistribution,
}
PeerCountView = &view.View{
Measure: PeerCount,
Aggregation: view.LastValue(),
@ -313,6 +345,12 @@ var ChainNodeViews = append([]*view.View{
MessageReceivedView,
MessageValidationFailureView,
MessageValidationSuccessView,
MessageValidationDurationView,
MpoolGetNonceDurationView,
MpoolGetBalanceDurationView,
MpoolAddTsDurationView,
MpoolAddDurationView,
MpoolPushDurationView,
PubsubPublishMessageView,
PubsubDeliverMessageView,
PubsubRejectMessageView,

View File

@ -6,6 +6,8 @@ import (
"github.com/ipfs/go-cid"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
miner5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/types"
@ -84,6 +86,9 @@ type SealingConfig struct {
AlwaysKeepUnsealedCopy bool
// Run sector finalization before submitting sector proof to the chain
FinalizeEarly bool
// enable / disable precommit batching (takes effect after nv13)
BatchPreCommits bool
// maximum precommit batch size - batches will be sent immediately above this size
@ -91,7 +96,7 @@ type SealingConfig struct {
MinPreCommitBatch int
// how long to wait before submitting a batch after crossing the minimum batch size
PreCommitBatchWait Duration
// time buffer for forceful batch submission before sectors in batch would start expiring
// time buffer for forceful batch submission before sectors/deal in batch would start expiring
PreCommitBatchSlack Duration
// enable / disable commit aggregation (takes effect after nv13)
@ -101,7 +106,7 @@ type SealingConfig struct {
MaxCommitBatch int
// how long to wait before submitting a batch after crossing the minimum batch size
CommitBatchWait Duration
// time buffer for forceful batch submission before sectors in batch would start expiring
// time buffer for forceful batch submission before sectors/deals in batch would start expiring
CommitBatchSlack Duration
TerminateBatchMax uint64
@ -114,9 +119,23 @@ type SealingConfig struct {
// todo TargetSectors - stop auto-pleding new sectors after this many sectors are sealed, default CC upgrade for deals sectors if above
}
type BatchFeeConfig struct {
Base types.FIL
PerSector types.FIL
}
func (b *BatchFeeConfig) FeeForSectors(nSectors int) abi.TokenAmount {
return big.Add(big.Int(b.Base), big.Mul(big.NewInt(int64(nSectors)), big.Int(b.PerSector)))
}
type MinerFeeConfig struct {
MaxPreCommitGasFee types.FIL
MaxCommitGasFee types.FIL
// maxBatchFee = maxBase + maxPerSector * nSectors
MaxPreCommitBatchGasFee BatchFeeConfig
MaxCommitBatchGasFee BatchFeeConfig
MaxTerminateGasFee types.FIL
MaxWindowPoStGasFee types.FIL
MaxPublishDealsFee types.FIL
@ -263,18 +282,19 @@ func DefaultStorageMiner() *StorageMiner {
MaxSealingSectorsForDeals: 0,
WaitDealsDelay: Duration(time.Hour * 6),
AlwaysKeepUnsealedCopy: true,
FinalizeEarly: false,
BatchPreCommits: true,
MinPreCommitBatch: 1, // we must have at least one proof to aggregate
MaxPreCommitBatch: miner5.PreCommitSectorBatchMaxSize, //
PreCommitBatchWait: Duration(24 * time.Hour), // this can be up to 6 days
PreCommitBatchSlack: Duration(3 * time.Hour),
MinPreCommitBatch: 1, // we must have at least one precommit to batch
MaxPreCommitBatch: miner5.PreCommitSectorBatchMaxSize, // up to 256 sectors
PreCommitBatchWait: Duration(24 * time.Hour), // this should be less than 31.5 hours, which is the expiration of a precommit ticket
PreCommitBatchSlack: Duration(3 * time.Hour), // time buffer for forceful batch submission before sectors/deals in batch would start expiring, higher value will lower the chances for message fail due to expiration
AggregateCommits: true,
MinCommitBatch: miner5.MinAggregatedSectors, // we must have at least four proofs to aggregate
MaxCommitBatch: miner5.MaxAggregatedSectors, // this is the maximum aggregation per FIP13
CommitBatchWait: Duration(24 * time.Hour), // this can be up to 6 days
CommitBatchSlack: Duration(1 * time.Hour),
MinCommitBatch: miner5.MinAggregatedSectors, // per FIP13, we must have at least four proofs to aggregate, where 4 is the cross over point where aggregation wins out on single provecommit gas costs
MaxCommitBatch: miner5.MaxAggregatedSectors, // maximum 819 sectors, this is the maximum aggregation per FIP13
CommitBatchWait: Duration(24 * time.Hour), // this can be up to 30 days
CommitBatchSlack: Duration(1 * time.Hour), // time buffer for forceful batch submission before sectors/deals in batch would start expiring, higher value will lower the chances for message fail due to expiration
TerminateBatchMin: 1,
TerminateBatchMax: 100,
@ -311,6 +331,16 @@ func DefaultStorageMiner() *StorageMiner {
Fees: MinerFeeConfig{
MaxPreCommitGasFee: types.MustParseFIL("0.025"),
MaxCommitGasFee: types.MustParseFIL("0.05"),
MaxPreCommitBatchGasFee: BatchFeeConfig{
Base: types.MustParseFIL("0.025"), // TODO: update before v1.10.0
PerSector: types.MustParseFIL("0.025"), // TODO: update before v1.10.0
},
MaxCommitBatchGasFee: BatchFeeConfig{
Base: types.MustParseFIL("0.05"), // TODO: update before v1.10.0
PerSector: types.MustParseFIL("0.05"), // TODO: update before v1.10.0
},
MaxTerminateGasFee: types.MustParseFIL("0.5"),
MaxWindowPoStGasFee: types.MustParseFIL("5"),
MaxPublishDealsFee: types.MustParseFIL("0.05"),

View File

@ -226,15 +226,15 @@ func (a *MpoolAPI) MpoolBatchPushMessage(ctx context.Context, msgs []*types.Mess
}
func (a *MpoolAPI) MpoolCheckMessages(ctx context.Context, protos []*api.MessagePrototype) ([][]api.MessageCheckStatus, error) {
return a.Mpool.CheckMessages(protos)
return a.Mpool.CheckMessages(ctx, protos)
}
func (a *MpoolAPI) MpoolCheckPendingMessages(ctx context.Context, from address.Address) ([][]api.MessageCheckStatus, error) {
return a.Mpool.CheckPendingMessages(from)
return a.Mpool.CheckPendingMessages(ctx, from)
}
func (a *MpoolAPI) MpoolCheckReplaceMessages(ctx context.Context, msgs []*types.Message) ([][]api.MessageCheckStatus, error) {
return a.Mpool.CheckReplaceMessages(msgs)
return a.Mpool.CheckReplaceMessages(ctx, msgs)
}
func (a *MpoolAPI) MpoolGetNonce(ctx context.Context, addr address.Address) (uint64, error) {

View File

@ -834,6 +834,7 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error
MaxSealingSectorsForDeals: cfg.MaxSealingSectorsForDeals,
WaitDealsDelay: config.Duration(cfg.WaitDealsDelay),
AlwaysKeepUnsealedCopy: cfg.AlwaysKeepUnsealedCopy,
FinalizeEarly: cfg.FinalizeEarly,
BatchPreCommits: cfg.BatchPreCommits,
MinPreCommitBatch: cfg.MinPreCommitBatch,
@ -865,6 +866,7 @@ func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error
MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals,
WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay),
AlwaysKeepUnsealedCopy: cfg.Sealing.AlwaysKeepUnsealedCopy,
FinalizeEarly: cfg.Sealing.FinalizeEarly,
BatchPreCommits: cfg.Sealing.BatchPreCommits,
MinPreCommitBatch: cfg.Sealing.MinPreCommitBatch,

View File

@ -360,6 +360,20 @@ func (s SealingAPIAdapter) ChainHead(ctx context.Context) (sealing.TipSetToken,
return head.Key().Bytes(), head.Height(), nil
}
func (s SealingAPIAdapter) ChainBaseFee(ctx context.Context, tok sealing.TipSetToken) (abi.TokenAmount, error) {
tsk, err := types.TipSetKeyFromBytes(tok)
if err != nil {
return big.Zero(), err
}
ts, err := s.delegate.ChainGetTipSet(ctx, tsk)
if err != nil {
return big.Zero(), err
}
return ts.Blocks()[0].ParentBaseFee, nil
}
func (s SealingAPIAdapter) ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.Message, error) {
return s.delegate.ChainGetMessage(ctx, mc)
}

View File

@ -171,12 +171,6 @@ func (m *Miner) Run(ctx context.Context) error {
return xerrors.Errorf("getting miner info: %w", err)
}
fc := sealing.FeeConfig{
MaxPreCommitGasFee: abi.TokenAmount(m.feeCfg.MaxPreCommitGasFee),
MaxCommitGasFee: abi.TokenAmount(m.feeCfg.MaxCommitGasFee),
MaxTerminateGasFee: abi.TokenAmount(m.feeCfg.MaxTerminateGasFee),
}
var (
// consumer of chain head changes.
evts = events.NewEvents(ctx, m.api)
@ -205,7 +199,7 @@ func (m *Miner) Run(ctx context.Context) error {
)
// Instantiate the sealing FSM.
m.sealing = sealing.New(adaptedAPI, fc, evtsAdapter, m.maddr, m.ds, m.sealer, m.sc, m.verif, m.prover, &pcp, cfg, m.handleSealingNotifications, as)
m.sealing = sealing.New(adaptedAPI, m.feeCfg, evtsAdapter, m.maddr, m.ds, m.sealer, m.sc, m.verif, m.prover, &pcp, cfg, m.handleSealingNotifications, as)
// Run the sealing FSM.
go m.sealing.Run(ctx) //nolint:errcheck // logged intside the function