Merge pull request #1 from filecoin-project/master

Fastforward main branch
This commit is contained in:
dumikau 2023-03-09 14:46:03 +03:00 committed by GitHub
commit c099317bb3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
57 changed files with 1336 additions and 281 deletions

View File

@ -9,15 +9,9 @@ body:
options:
- label: This is **not** a security-related bug/issue. If it is, please follow please follow the [security policy](https://github.com/filecoin-project/lotus/security/policy).
required: true
- label: This is **not** a question or a support request. If you have any lotus related questions, please ask in the [lotus forum](https://github.com/filecoin-project/lotus/discussions).
required: true
- label: This is **not** a new feature request. If it is, please file a [feature request](https://github.com/filecoin-project/lotus/issues/new?assignees=&labels=need%2Ftriage%2Ckind%2Ffeature&template=feature_request.yml) instead.
required: true
- label: This is **not** an enhancement request. If it is, please file a [improvement suggestion](https://github.com/filecoin-project/lotus/issues/new?assignees=&labels=need%2Ftriage%2Ckind%2Fenhancement&template=enhancement.yml) instead.
required: true
- label: I **have** searched on the [issue tracker](https://github.com/filecoin-project/lotus/issues) and the [lotus forum](https://github.com/filecoin-project/lotus/discussions), and there is no existing related issue or discussion.
required: true
- label: I am running the [`Latest release`](https://github.com/filecoin-project/lotus/releases), or the most recent RC(release canadiate) for the upcoming release or the dev branch(master), or have an issue updating to any of these.
- label: I am running the [`Latest release`](https://github.com/filecoin-project/lotus/releases), the most recent RC(release canadiate) for the upcoming release or the dev branch(master), or have an issue updating to any of these.
required: true
- label: I did not make any code changes to lotus.
required: false
@ -28,19 +22,11 @@ body:
options:
- label: lotus daemon - chain sync
required: false
- label: lotus miner - mining and block production
- label: lotus fvm/fevm - Lotus FVM and FEVM interactions
required: false
- label: lotus miner/worker - sealing
required: false
- label: lotus miner - proving(WindowPoSt)
required: false
- label: lotus miner/market - storage deal
required: false
- label: lotus miner/market - retrieval deal
required: false
- label: lotus miner/market - data transfer
required: false
- label: lotus client
- label: lotus miner - proving(WindowPoSt/WinningPoSt)
required: false
- label: lotus JSON-RPC API
required: false
@ -56,22 +42,33 @@ body:
description: Enter the output of `lotus version` and `lotus-miner version` if applicable.
placeholder: |
e.g.
Daemon:1.11.0-rc2+debug+git.0519cd371.dirty+api1.3.0
Local: lotus version 1.11.0-rc2+debug+git.0519cd371.dirty
Daemon: 1.19.0+mainnet+git.64059ca87+api1.5.0
Local: lotus-miner version 1.19.0+mainnet+git.64059ca87
validations:
required: true
- type: textarea
id: ReproSteps
attributes:
label: Repro Steps
description: "Steps to reproduce the behavior"
value: |
1. Run '...'
2. Do '...'
3. See error '...'
...
validations:
required: false
- type: textarea
id: Description
attributes:
label: Describe the Bug
description: |
This is where you get to tell us what went wrong, when doing so, please try to provide a clear and concise description of the bug with all related information:
* What you were doding when you experienced the bug?
* What you were doing when you experienced the bug?
* Any *error* messages you saw, *where* you saw them, and what you believe may have caused them (if you have any ideas).
* What is the expected behaviour?
* For sealing issues, include the output of `lotus-miner sectors status --log <sectorId>` for the failed sector(s).
* For proving issues, include the output of `lotus-miner proving` info.
* For deal making issues, include the output of `lotus client list-deals -v` and/or `lotus-miner storage-deals|retrieval-deals|data-transfers list [-v]` commands for the deal(s) in question.
validations:
required: true
- type: textarea
@ -83,18 +80,6 @@ body:
Please provide debug logs of the problem, remember you can get set log level control for:
* lotus: use `lotus log list` to get all log systems available and set level by `lotus log set-level`. An example can be found [here](https://lotus.filecoin.io/lotus/configure/defaults/#log-level-control).
* lotus-miner:`lotus-miner log list` to get all log systems available and set level by `lotus-miner log set-level
If you don't provide detailed logs when you raise the issue it will almost certainly be the first request I make before furthur diagnosing the problem.
If you don't provide detailed logs when you raise the issue it will almost certainly be the first request we make before furthur diagnosing the problem.
validations:
required: true
- type: textarea
id: RepoSteps
attributes:
label: Repo Steps
description: "Steps to reproduce the behavior"
value: |
1. Run '...'
2. Do '...'
3. See error '...'
...
validations:
required: false
required: true

8
.github/ISSUE_TEMPLATE/config.yml vendored Normal file
View File

@ -0,0 +1,8 @@
blank_issues_enabled: true
contact_links:
- name: Ask a question about Lotus or get support
url: https://github.com/filecoin-project/lotus/discussions/new/choose
about: Ask a question or request support for using Lotus
- name: Filecoin protocol feature or enhancement
url: https://github.com/filecoin-project/FIPs/discussions/new/choose
about: Write a discussion in the Filecoin Improvement Proposal repo

View File

@ -7,13 +7,7 @@ body:
label: Checklist
description: Please check off the following boxes before continuing to create an improvement suggestion!
options:
- label: This is **not** a new feature or an enhancement to the Filecoin protocol. If it is, please open an [FIP issue](https://github.com/filecoin-project/FIPs/blob/master/FIPS/fip-0001.md).
required: true
- label: This is **not** a new feature request. If it is, please file a [feature request](https://github.com/filecoin-project/lotus/issues/new?assignees=&labels=need%2Ftriage%2Ckind%2Ffeature&template=feature_request.yml) instead.
required: true
- label: This is **not** brainstorming ideas. If you have an idea you'd like to discuss, please open a new discussion on [the lotus forum](https://github.com/filecoin-project/lotus/discussions/categories/ideas) and select the category as `Ideas`.
required: true
- label: I **have** a specific, actionable, and well motivated improvement to propose.
- label: I **have** a specific, actionable, and well motivated improvement to an existing lotus feature.
required: true
- type: checkboxes
attributes:
@ -22,19 +16,11 @@ body:
options:
- label: lotus daemon - chain sync
required: false
- label: lotus miner - mining and block production
- label: lotus fvm/fevm - Lotus FVM and FEVM interactions
required: false
- label: lotus miner/worker - sealing
required: false
- label: lotus miner - proving(WindowPoSt)
required: false
- label: lotus miner/market - storage deal
required: false
- label: lotus miner/market - retrieval deal
required: false
- label: lotus miner/market - data transfer
required: false
- label: lotus client
- label: lotus miner - proving(WindowPoSt/WinningPoSt)
required: false
- label: lotus JSON-RPC API
required: false
@ -45,9 +31,17 @@ body:
- type: textarea
id: request
attributes:
label: Improvement Suggestion
description: A clear and concise description of what the motivation or the current problem is and what is the suggested improvement?
placeholder: Ex. Currently lotus... However, as a storage provider, I'd like...
label: Enhancement Suggestion
description: A clear and concise description of the suggested enhancement?
placeholder: Ex. Currently lotus... However it would be great if [enhancement] was implemented... With the ability to...
validations:
required: true
- type: textarea
id: request
attributes:
label: Use-Case
description: How would this enhancement help you?
placeholder: Ex. With the [enhancement] node operators would be able to... For Storage Providers it would enable...
validations:
required: true

View File

@ -7,8 +7,6 @@ body:
label: Checklist
description: Please check off the following boxes before continuing to create a new feature request!
options:
- label: This is **not** a new feature or an enhancement to the Filecoin protocol. If it is, please open an [FIP issue](https://github.com/filecoin-project/FIPs/blob/master/FIPS/fip-0001.md).
required: true
- label: This is **not** brainstorming ideas. If you have an idea you'd like to discuss, please open a new discussion on [the lotus forum](https://github.com/filecoin-project/lotus/discussions/categories/ideas) and select the category as `Ideas`.
required: true
- label: I **have** a specific, actionable, and well motivated feature request to propose.
@ -20,19 +18,11 @@ body:
options:
- label: lotus daemon - chain sync
required: false
- label: lotus miner - mining and block production
- label: lotus fvm/fevm - Lotus FVM and FEVM interactions
required: false
- label: lotus miner/worker - sealing
required: false
- label: lotus miner - proving(WindowPoSt)
required: false
- label: lotus miner/market - storage deal
required: false
- label: lotus miner/market - retrieval deal
required: false
- label: lotus miner/market - data transfer
required: false
- label: lotus client
- label: lotus miner - proving(WindowPoSt/WinningPoSt)
required: false
- label: lotus JSON-RPC API
required: false
@ -56,7 +46,7 @@ body:
validations:
required: true
- type: textarea
id: alternates
id: alternatives
attributes:
label: Describe alternatives you've considered
description: A clear and concise description of any alternative solutions or features you've considered.
@ -69,4 +59,3 @@ body:
description: Add any other context, design docs or screenshots about the feature request here.
validations:
required: false

View File

@ -0,0 +1,83 @@
name: "Bug Report - developer/service provider"
description: "Bug report template about FEVM/FVM for developers/service providers"
labels: [need/triage, kind/bug, area/fevm]
body:
- type: checkboxes
attributes:
label: Checklist
description: Please check off the following boxes before continuing to file a bug report!
options:
- label: This is **not** a security-related bug/issue. If it is, please follow please follow the [security policy](https://github.com/filecoin-project/lotus/security/policy).
required: true
- label: I **have** searched on the [issue tracker](https://github.com/filecoin-project/lotus/issues) and the [lotus forum](https://github.com/filecoin-project/lotus/discussions), and there is no existing related issue or discussion.
required: true
- label: I did not make any code changes to lotus.
required: false
- type: checkboxes
attributes:
label: Lotus component
description: Please select the lotus component you are filing a bug for
options:
- label: lotus Ethereum RPC
required: false
- label: lotus FVM - Lotus FVM interactions
required: false
- label: FEVM tooling
required: false
- label: Other
required: false
- type: textarea
id: version
attributes:
label: Lotus Version
render: text
description: Enter the output of `lotus version` if applicable.
placeholder: |
e.g.
Daemon: 1.19.0+mainnet+git.64059ca87+api1.5.0
Local: lotus-miner version 1.19.0+mainnet+git.64059ca87
validations:
required: true
- type: textarea
id: repro
attributes:
label: Repro Steps
description: "Steps to reproduce the behavior"
value: |
1. Run '...'
2. Do '...'
3. See error '...'
...
validations:
required: false
- type: textarea
id: Description
attributes:
label: Describe the Bug
description: |
This is where you get to tell us what went wrong, when doing so, please try to provide a clear and concise description of the bug with all related information:
* What you were doing when you experienced the bug? What are you trying to build?
* Any *error* messages and logs you saw, *where* you saw them, and what you believe may have caused them (if you have any ideas).
* What is the expected behaviour? Links to the actual code?
validations:
required: true
- type: textarea
id: toolingInfo
attributes:
label: Tooling
render: text
description: |
What kind of tooling are you using:
* Are you using ether.js, Alchemy, Hardhat, etc.
validations:
required: true
- type: textarea
id: extraInfo
attributes:
label: Configuration Options
render: text
description: |
Please provide your updated FEVM related configuration options, or custome enviroment variables related to Lotus FEVM
* lotus: use `lotus config updated` to get your configuration options, and copy the [FEVM] section
validations:
required: true

View File

@ -183,10 +183,14 @@ type FullNode interface {
// nodes.
ChainExportRangeInternal(ctx context.Context, head, tail types.TipSetKey, cfg ChainExportConfig) error //perm:admin
// ChainPrune prunes the stored chain state and garbage collects; only supported if you
// ChainPrune forces compaction on cold store and garbage collects; only supported if you
// are using the splitstore
ChainPrune(ctx context.Context, opts PruneOpts) error //perm:admin
// ChainHotGC does online (badger) GC on the hot store; only supported if you are using
// the splitstore
ChainHotGC(ctx context.Context, opts HotGCOpts) error //perm:admin
// ChainCheckBlockstore performs an (asynchronous) health check on the chain/state blockstore
// if supported by the underlying implementation.
ChainCheckBlockstore(context.Context) error //perm:admin
@ -1354,6 +1358,12 @@ type PruneOpts struct {
RetainState int64
}
type HotGCOpts struct {
Threshold float64
Periodic bool
Moving bool
}
type EthTxReceipt struct {
TransactionHash ethtypes.EthHash `json:"transactionHash"`
TransactionIndex ethtypes.EthUint64 `json:"transactionIndex"`

View File

@ -15,6 +15,7 @@ import (
apitypes "github.com/filecoin-project/lotus/api/types"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/types/ethtypes"
"github.com/filecoin-project/lotus/node/modules/dtypes"
)
// MODIFYING THE API INTERFACE
@ -25,7 +26,7 @@ import (
// When adding / changing methods in this file:
// * Do the change here
// * Adjust implementation in `node/impl/`
// * Run `make gen` - this will:
// * Run `make clean && make deps && make gen` - this will:
// * Generate proxy structs
// * Generate mocks
// * Generate markdown docs
@ -47,15 +48,18 @@ type Gateway interface {
ChainReadObj(context.Context, cid.Cid) ([]byte, error)
ChainGetGenesis(context.Context) (*types.TipSet, error)
GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *MessageSendSpec, tsk types.TipSetKey) (*types.Message, error)
MpoolGetNonce(ctx context.Context, addr address.Address) (uint64, error)
MpoolPush(ctx context.Context, sm *types.SignedMessage) (cid.Cid, error)
MsigGetAvailableBalance(ctx context.Context, addr address.Address, tsk types.TipSetKey) (types.BigInt, error)
MsigGetPending(context.Context, address.Address, types.TipSetKey) ([]*MsigTransaction, error)
MsigGetVested(ctx context.Context, addr address.Address, start types.TipSetKey, end types.TipSetKey) (types.BigInt, error)
MsigGetVestingSchedule(ctx context.Context, addr address.Address, tsk types.TipSetKey) (MsigVesting, error)
StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error)
StateCall(ctx context.Context, msg *types.Message, tsk types.TipSetKey) (*InvocResult, error)
StateDealProviderCollateralBounds(ctx context.Context, size abi.PaddedPieceSize, verified bool, tsk types.TipSetKey) (DealCollateralBounds, error)
StateDecodeParams(ctx context.Context, toAddr address.Address, method abi.MethodNum, params []byte, tsk types.TipSetKey) (interface{}, error)
StateGetActor(ctx context.Context, actor address.Address, ts types.TipSetKey) (*types.Actor, error)
StateReadState(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*ActorState, error) //perm:read
StateReadState(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*ActorState, error)
StateListMiners(ctx context.Context, tsk types.TipSetKey) ([]address.Address, error)
StateLookupID(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error)
StateMarketBalance(ctx context.Context, addr address.Address, tsk types.TipSetKey) (MarketBalance, error)
@ -63,6 +67,7 @@ type Gateway interface {
StateMinerInfo(ctx context.Context, actor address.Address, tsk types.TipSetKey) (MinerInfo, error)
StateMinerProvingDeadline(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*dline.Info, error)
StateMinerPower(context.Context, address.Address, types.TipSetKey) (*MinerPower, error)
StateNetworkName(context.Context) (dtypes.NetworkName, error)
StateNetworkVersion(context.Context, types.TipSetKey) (apitypes.NetworkVersion, error)
StateSectorGetInfo(ctx context.Context, maddr address.Address, n abi.SectorNumber, tsk types.TipSetKey) (*miner.SectorOnChainInfo, error)
StateVerifiedClientStatus(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*abi.StoragePower, error)

View File

@ -394,6 +394,20 @@ func (mr *MockFullNodeMockRecorder) ChainHead(arg0 interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainHead", reflect.TypeOf((*MockFullNode)(nil).ChainHead), arg0)
}
// ChainHotGC mocks base method.
func (m *MockFullNode) ChainHotGC(arg0 context.Context, arg1 api.HotGCOpts) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ChainHotGC", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// ChainHotGC indicates an expected call of ChainHotGC.
func (mr *MockFullNodeMockRecorder) ChainHotGC(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainHotGC", reflect.TypeOf((*MockFullNode)(nil).ChainHotGC), arg0, arg1)
}
// ChainNotify mocks base method.
func (m *MockFullNode) ChainNotify(arg0 context.Context) (<-chan []*api.HeadChange, error) {
m.ctrl.T.Helper()

View File

@ -172,6 +172,8 @@ type FullNodeMethods struct {
ChainHead func(p0 context.Context) (*types.TipSet, error) `perm:"read"`
ChainHotGC func(p0 context.Context, p1 HotGCOpts) error `perm:"admin"`
ChainNotify func(p0 context.Context) (<-chan []*HeadChange, error) `perm:"read"`
ChainPrune func(p0 context.Context, p1 PruneOpts) error `perm:"admin"`
@ -722,6 +724,8 @@ type GatewayMethods struct {
GasEstimateMessageGas func(p0 context.Context, p1 *types.Message, p2 *MessageSendSpec, p3 types.TipSetKey) (*types.Message, error) ``
MpoolGetNonce func(p0 context.Context, p1 address.Address) (uint64, error) ``
MpoolPush func(p0 context.Context, p1 *types.SignedMessage) (cid.Cid, error) ``
MsigGetAvailableBalance func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (types.BigInt, error) ``
@ -738,8 +742,12 @@ type GatewayMethods struct {
StateAccountKey func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (address.Address, error) ``
StateCall func(p0 context.Context, p1 *types.Message, p2 types.TipSetKey) (*InvocResult, error) ``
StateDealProviderCollateralBounds func(p0 context.Context, p1 abi.PaddedPieceSize, p2 bool, p3 types.TipSetKey) (DealCollateralBounds, error) ``
StateDecodeParams func(p0 context.Context, p1 address.Address, p2 abi.MethodNum, p3 []byte, p4 types.TipSetKey) (interface{}, error) ``
StateGetActor func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (*types.Actor, error) ``
StateListMiners func(p0 context.Context, p1 types.TipSetKey) ([]address.Address, error) ``
@ -756,9 +764,11 @@ type GatewayMethods struct {
StateMinerProvingDeadline func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (*dline.Info, error) ``
StateNetworkName func(p0 context.Context) (dtypes.NetworkName, error) ``
StateNetworkVersion func(p0 context.Context, p1 types.TipSetKey) (apitypes.NetworkVersion, error) ``
StateReadState func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (*ActorState, error) `perm:"read"`
StateReadState func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (*ActorState, error) ``
StateSearchMsg func(p0 context.Context, p1 types.TipSetKey, p2 cid.Cid, p3 abi.ChainEpoch, p4 bool) (*MsgLookup, error) ``
@ -1617,6 +1627,17 @@ func (s *FullNodeStub) ChainHead(p0 context.Context) (*types.TipSet, error) {
return nil, ErrNotSupported
}
func (s *FullNodeStruct) ChainHotGC(p0 context.Context, p1 HotGCOpts) error {
if s.Internal.ChainHotGC == nil {
return ErrNotSupported
}
return s.Internal.ChainHotGC(p0, p1)
}
func (s *FullNodeStub) ChainHotGC(p0 context.Context, p1 HotGCOpts) error {
return ErrNotSupported
}
func (s *FullNodeStruct) ChainNotify(p0 context.Context) (<-chan []*HeadChange, error) {
if s.Internal.ChainNotify == nil {
return nil, ErrNotSupported
@ -4576,6 +4597,17 @@ func (s *GatewayStub) GasEstimateMessageGas(p0 context.Context, p1 *types.Messag
return nil, ErrNotSupported
}
func (s *GatewayStruct) MpoolGetNonce(p0 context.Context, p1 address.Address) (uint64, error) {
if s.Internal.MpoolGetNonce == nil {
return 0, ErrNotSupported
}
return s.Internal.MpoolGetNonce(p0, p1)
}
func (s *GatewayStub) MpoolGetNonce(p0 context.Context, p1 address.Address) (uint64, error) {
return 0, ErrNotSupported
}
func (s *GatewayStruct) MpoolPush(p0 context.Context, p1 *types.SignedMessage) (cid.Cid, error) {
if s.Internal.MpoolPush == nil {
return *new(cid.Cid), ErrNotSupported
@ -4664,6 +4696,17 @@ func (s *GatewayStub) StateAccountKey(p0 context.Context, p1 address.Address, p2
return *new(address.Address), ErrNotSupported
}
func (s *GatewayStruct) StateCall(p0 context.Context, p1 *types.Message, p2 types.TipSetKey) (*InvocResult, error) {
if s.Internal.StateCall == nil {
return nil, ErrNotSupported
}
return s.Internal.StateCall(p0, p1, p2)
}
func (s *GatewayStub) StateCall(p0 context.Context, p1 *types.Message, p2 types.TipSetKey) (*InvocResult, error) {
return nil, ErrNotSupported
}
func (s *GatewayStruct) StateDealProviderCollateralBounds(p0 context.Context, p1 abi.PaddedPieceSize, p2 bool, p3 types.TipSetKey) (DealCollateralBounds, error) {
if s.Internal.StateDealProviderCollateralBounds == nil {
return *new(DealCollateralBounds), ErrNotSupported
@ -4675,6 +4718,17 @@ func (s *GatewayStub) StateDealProviderCollateralBounds(p0 context.Context, p1 a
return *new(DealCollateralBounds), ErrNotSupported
}
func (s *GatewayStruct) StateDecodeParams(p0 context.Context, p1 address.Address, p2 abi.MethodNum, p3 []byte, p4 types.TipSetKey) (interface{}, error) {
if s.Internal.StateDecodeParams == nil {
return nil, ErrNotSupported
}
return s.Internal.StateDecodeParams(p0, p1, p2, p3, p4)
}
func (s *GatewayStub) StateDecodeParams(p0 context.Context, p1 address.Address, p2 abi.MethodNum, p3 []byte, p4 types.TipSetKey) (interface{}, error) {
return nil, ErrNotSupported
}
func (s *GatewayStruct) StateGetActor(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (*types.Actor, error) {
if s.Internal.StateGetActor == nil {
return nil, ErrNotSupported
@ -4763,6 +4817,17 @@ func (s *GatewayStub) StateMinerProvingDeadline(p0 context.Context, p1 address.A
return nil, ErrNotSupported
}
func (s *GatewayStruct) StateNetworkName(p0 context.Context) (dtypes.NetworkName, error) {
if s.Internal.StateNetworkName == nil {
return *new(dtypes.NetworkName), ErrNotSupported
}
return s.Internal.StateNetworkName(p0)
}
func (s *GatewayStub) StateNetworkName(p0 context.Context) (dtypes.NetworkName, error) {
return *new(dtypes.NetworkName), ErrNotSupported
}
func (s *GatewayStruct) StateNetworkVersion(p0 context.Context, p1 types.TipSetKey) (apitypes.NetworkVersion, error) {
if s.Internal.StateNetworkVersion == nil {
return *new(apitypes.NetworkVersion), ErrNotSupported

View File

@ -14,6 +14,7 @@ import (
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/modules/dtypes"
)
// MODIFYING THE API INTERFACE
@ -44,12 +45,15 @@ type Gateway interface {
ChainNotify(context.Context) (<-chan []*api.HeadChange, error)
ChainReadObj(context.Context, cid.Cid) ([]byte, error)
GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, tsk types.TipSetKey) (*types.Message, error)
MpoolGetNonce(ctx context.Context, addr address.Address) (uint64, error)
MpoolPush(ctx context.Context, sm *types.SignedMessage) (cid.Cid, error)
MsigGetAvailableBalance(ctx context.Context, addr address.Address, tsk types.TipSetKey) (types.BigInt, error)
MsigGetVested(ctx context.Context, addr address.Address, start types.TipSetKey, end types.TipSetKey) (types.BigInt, error)
MsigGetPending(context.Context, address.Address, types.TipSetKey) ([]*api.MsigTransaction, error)
StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error)
StateCall(ctx context.Context, msg *types.Message, tsk types.TipSetKey) (*api.InvocResult, error)
StateDealProviderCollateralBounds(ctx context.Context, size abi.PaddedPieceSize, verified bool, tsk types.TipSetKey) (api.DealCollateralBounds, error)
StateDecodeParams(ctx context.Context, toAddr address.Address, method abi.MethodNum, params []byte, tsk types.TipSetKey) (interface{}, error)
StateGetActor(ctx context.Context, actor address.Address, ts types.TipSetKey) (*types.Actor, error)
StateGetReceipt(context.Context, cid.Cid, types.TipSetKey) (*types.MessageReceipt, error)
StateListMiners(ctx context.Context, tsk types.TipSetKey) ([]address.Address, error)
@ -59,6 +63,7 @@ type Gateway interface {
StateMinerInfo(ctx context.Context, actor address.Address, tsk types.TipSetKey) (api.MinerInfo, error)
StateMinerProvingDeadline(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*dline.Info, error)
StateMinerPower(context.Context, address.Address, types.TipSetKey) (*api.MinerPower, error)
StateNetworkName(context.Context) (dtypes.NetworkName, error)
StateNetworkVersion(context.Context, types.TipSetKey) (abinetwork.Version, error)
StateSearchMsg(ctx context.Context, msg cid.Cid) (*api.MsgLookup, error)
StateSectorGetInfo(ctx context.Context, maddr address.Address, n abi.SectorNumber, tsk types.TipSetKey) (*miner.SectorOnChainInfo, error)

View File

@ -451,6 +451,8 @@ type GatewayMethods struct {
GasEstimateMessageGas func(p0 context.Context, p1 *types.Message, p2 *api.MessageSendSpec, p3 types.TipSetKey) (*types.Message, error) ``
MpoolGetNonce func(p0 context.Context, p1 address.Address) (uint64, error) ``
MpoolPush func(p0 context.Context, p1 *types.SignedMessage) (cid.Cid, error) ``
MsigGetAvailableBalance func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (types.BigInt, error) ``
@ -461,8 +463,12 @@ type GatewayMethods struct {
StateAccountKey func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (address.Address, error) ``
StateCall func(p0 context.Context, p1 *types.Message, p2 types.TipSetKey) (*api.InvocResult, error) ``
StateDealProviderCollateralBounds func(p0 context.Context, p1 abi.PaddedPieceSize, p2 bool, p3 types.TipSetKey) (api.DealCollateralBounds, error) ``
StateDecodeParams func(p0 context.Context, p1 address.Address, p2 abi.MethodNum, p3 []byte, p4 types.TipSetKey) (interface{}, error) ``
StateGetActor func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (*types.Actor, error) ``
StateGetReceipt func(p0 context.Context, p1 cid.Cid, p2 types.TipSetKey) (*types.MessageReceipt, error) ``
@ -481,6 +487,8 @@ type GatewayMethods struct {
StateMinerProvingDeadline func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (*dline.Info, error) ``
StateNetworkName func(p0 context.Context) (dtypes.NetworkName, error) ``
StateNetworkVersion func(p0 context.Context, p1 types.TipSetKey) (abinetwork.Version, error) ``
StateSearchMsg func(p0 context.Context, p1 cid.Cid) (*api.MsgLookup, error) ``
@ -2677,6 +2685,17 @@ func (s *GatewayStub) GasEstimateMessageGas(p0 context.Context, p1 *types.Messag
return nil, ErrNotSupported
}
func (s *GatewayStruct) MpoolGetNonce(p0 context.Context, p1 address.Address) (uint64, error) {
if s.Internal.MpoolGetNonce == nil {
return 0, ErrNotSupported
}
return s.Internal.MpoolGetNonce(p0, p1)
}
func (s *GatewayStub) MpoolGetNonce(p0 context.Context, p1 address.Address) (uint64, error) {
return 0, ErrNotSupported
}
func (s *GatewayStruct) MpoolPush(p0 context.Context, p1 *types.SignedMessage) (cid.Cid, error) {
if s.Internal.MpoolPush == nil {
return *new(cid.Cid), ErrNotSupported
@ -2732,6 +2751,17 @@ func (s *GatewayStub) StateAccountKey(p0 context.Context, p1 address.Address, p2
return *new(address.Address), ErrNotSupported
}
func (s *GatewayStruct) StateCall(p0 context.Context, p1 *types.Message, p2 types.TipSetKey) (*api.InvocResult, error) {
if s.Internal.StateCall == nil {
return nil, ErrNotSupported
}
return s.Internal.StateCall(p0, p1, p2)
}
func (s *GatewayStub) StateCall(p0 context.Context, p1 *types.Message, p2 types.TipSetKey) (*api.InvocResult, error) {
return nil, ErrNotSupported
}
func (s *GatewayStruct) StateDealProviderCollateralBounds(p0 context.Context, p1 abi.PaddedPieceSize, p2 bool, p3 types.TipSetKey) (api.DealCollateralBounds, error) {
if s.Internal.StateDealProviderCollateralBounds == nil {
return *new(api.DealCollateralBounds), ErrNotSupported
@ -2743,6 +2773,17 @@ func (s *GatewayStub) StateDealProviderCollateralBounds(p0 context.Context, p1 a
return *new(api.DealCollateralBounds), ErrNotSupported
}
func (s *GatewayStruct) StateDecodeParams(p0 context.Context, p1 address.Address, p2 abi.MethodNum, p3 []byte, p4 types.TipSetKey) (interface{}, error) {
if s.Internal.StateDecodeParams == nil {
return nil, ErrNotSupported
}
return s.Internal.StateDecodeParams(p0, p1, p2, p3, p4)
}
func (s *GatewayStub) StateDecodeParams(p0 context.Context, p1 address.Address, p2 abi.MethodNum, p3 []byte, p4 types.TipSetKey) (interface{}, error) {
return nil, ErrNotSupported
}
func (s *GatewayStruct) StateGetActor(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (*types.Actor, error) {
if s.Internal.StateGetActor == nil {
return nil, ErrNotSupported
@ -2842,6 +2883,17 @@ func (s *GatewayStub) StateMinerProvingDeadline(p0 context.Context, p1 address.A
return nil, ErrNotSupported
}
func (s *GatewayStruct) StateNetworkName(p0 context.Context) (dtypes.NetworkName, error) {
if s.Internal.StateNetworkName == nil {
return *new(dtypes.NetworkName), ErrNotSupported
}
return s.Internal.StateNetworkName(p0)
}
func (s *GatewayStub) StateNetworkName(p0 context.Context) (dtypes.NetworkName, error) {
return *new(dtypes.NetworkName), ErrNotSupported
}
func (s *GatewayStruct) StateNetworkVersion(p0 context.Context, p1 types.TipSetKey) (abinetwork.Version, error) {
if s.Internal.StateNetworkVersion == nil {
return *new(abinetwork.Version), ErrNotSupported

View File

@ -20,6 +20,7 @@ import (
pool "github.com/libp2p/go-buffer-pool"
"github.com/multiformats/go-base32"
"go.uber.org/zap"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/blockstore"
)
@ -44,7 +45,8 @@ const (
// MemoryMap is equivalent to badger/options.MemoryMap.
MemoryMap = options.MemoryMap
// LoadToRAM is equivalent to badger/options.LoadToRAM.
LoadToRAM = options.LoadToRAM
LoadToRAM = options.LoadToRAM
defaultGCThreshold = 0.125
)
// Options embeds the badger options themselves, and augments them with
@ -439,7 +441,7 @@ func (b *Blockstore) deleteDB(path string) {
}
}
func (b *Blockstore) onlineGC() error {
func (b *Blockstore) onlineGC(ctx context.Context, threshold float64) error {
b.lockDB()
defer b.unlockDB()
@ -448,6 +450,9 @@ func (b *Blockstore) onlineGC() error {
if nworkers < 2 {
nworkers = 2
}
if nworkers > 7 { // max out at 1 goroutine per badger level
nworkers = 7
}
err := b.db.Flatten(nworkers)
if err != nil {
@ -455,7 +460,12 @@ func (b *Blockstore) onlineGC() error {
}
for err == nil {
err = b.db.RunValueLogGC(0.125)
select {
case <-ctx.Done():
err = ctx.Err()
default:
err = b.db.RunValueLogGC(threshold)
}
}
if err == badger.ErrNoRewrite {
@ -468,7 +478,7 @@ func (b *Blockstore) onlineGC() error {
// CollectGarbage compacts and runs garbage collection on the value log;
// implements the BlockstoreGC trait
func (b *Blockstore) CollectGarbage(opts ...blockstore.BlockstoreGCOption) error {
func (b *Blockstore) CollectGarbage(ctx context.Context, opts ...blockstore.BlockstoreGCOption) error {
if err := b.access(); err != nil {
return err
}
@ -485,8 +495,48 @@ func (b *Blockstore) CollectGarbage(opts ...blockstore.BlockstoreGCOption) error
if options.FullGC {
return b.movingGC()
}
threshold := options.Threshold
if threshold == 0 {
threshold = defaultGCThreshold
}
return b.onlineGC(ctx, threshold)
}
return b.onlineGC()
// GCOnce runs garbage collection on the value log;
// implements BlockstoreGCOnce trait
func (b *Blockstore) GCOnce(ctx context.Context, opts ...blockstore.BlockstoreGCOption) error {
if err := b.access(); err != nil {
return err
}
defer b.viewers.Done()
var options blockstore.BlockstoreGCOptions
for _, opt := range opts {
err := opt(&options)
if err != nil {
return err
}
}
if options.FullGC {
return xerrors.Errorf("FullGC option specified for GCOnce but full GC is non incremental")
}
threshold := options.Threshold
if threshold == 0 {
threshold = defaultGCThreshold
}
b.lockDB()
defer b.unlockDB()
// Note no compaction needed before single GC as we will hit at most one vlog anyway
err := b.db.RunValueLogGC(threshold)
if err == badger.ErrNoRewrite {
// not really an error in this case, it signals the end of GC
return nil
}
return err
}
// Size returns the aggregate size of the blockstore

View File

@ -145,7 +145,7 @@ func testMove(t *testing.T, optsF func(string) Options) {
return nil
})
g.Go(func() error {
return db.CollectGarbage(blockstore.WithFullGC(true))
return db.CollectGarbage(ctx, blockstore.WithFullGC(true))
})
err = g.Wait()
@ -230,7 +230,7 @@ func testMove(t *testing.T, optsF func(string) Options) {
checkPath()
// now do another FullGC to test the double move and following of symlinks
if err := db.CollectGarbage(blockstore.WithFullGC(true)); err != nil {
if err := db.CollectGarbage(ctx, blockstore.WithFullGC(true)); err != nil {
t.Fatal(err)
}

View File

@ -36,7 +36,12 @@ type BlockstoreIterator interface {
// BlockstoreGC is a trait for blockstores that support online garbage collection
type BlockstoreGC interface {
CollectGarbage(options ...BlockstoreGCOption) error
CollectGarbage(ctx context.Context, options ...BlockstoreGCOption) error
}
// BlockstoreGCOnce is a trait for a blockstore that supports incremental online garbage collection
type BlockstoreGCOnce interface {
GCOnce(ctx context.Context, options ...BlockstoreGCOption) error
}
// BlockstoreGCOption is a functional interface for controlling blockstore GC options
@ -45,6 +50,8 @@ type BlockstoreGCOption = func(*BlockstoreGCOptions) error
// BlockstoreGCOptions is a struct with GC options
type BlockstoreGCOptions struct {
FullGC bool
// fraction of garbage in badger vlog before its worth processing in online GC
Threshold float64
}
func WithFullGC(fullgc bool) BlockstoreGCOption {
@ -54,6 +61,13 @@ func WithFullGC(fullgc bool) BlockstoreGCOption {
}
}
func WithThreshold(threshold float64) BlockstoreGCOption {
return func(opts *BlockstoreGCOptions) error {
opts.Threshold = threshold
return nil
}
}
// BlockstoreSize is a trait for on-disk blockstores that can report their size
type BlockstoreSize interface {
Size() (int64, error)

View File

@ -794,7 +794,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
// we are done; do some housekeeping
s.endTxnProtect()
s.gcHotstore()
s.gcHotAfterCompaction()
err = s.setBaseEpoch(boundaryEpoch)
if err != nil {

View File

@ -7,7 +7,7 @@ import (
bstore "github.com/filecoin-project/lotus/blockstore"
)
func (s *SplitStore) gcHotstore() {
func (s *SplitStore) gcHotAfterCompaction() {
var opts []bstore.BlockstoreGCOption
if s.cfg.HotStoreFullGCFrequency > 0 && s.compactionIndex%int64(s.cfg.HotStoreFullGCFrequency) == 0 {
opts = append(opts, bstore.WithFullGC(true))
@ -23,7 +23,7 @@ func (s *SplitStore) gcBlockstore(b bstore.Blockstore, opts []bstore.BlockstoreG
log.Info("garbage collecting blockstore")
startGC := time.Now()
if err := gc.CollectGarbage(opts...); err != nil {
if err := gc.CollectGarbage(s.ctx, opts...); err != nil {
return err
}
@ -33,3 +33,19 @@ func (s *SplitStore) gcBlockstore(b bstore.Blockstore, opts []bstore.BlockstoreG
return fmt.Errorf("blockstore doesn't support garbage collection: %T", b)
}
func (s *SplitStore) gcBlockstoreOnce(b bstore.Blockstore, opts []bstore.BlockstoreGCOption) error {
if gc, ok := b.(bstore.BlockstoreGCOnce); ok {
log.Debug("gc blockstore once")
startGC := time.Now()
if err := gc.GCOnce(s.ctx, opts...); err != nil {
return err
}
log.Debugw("gc blockstore once done", "took", time.Since(startGC))
return nil
}
return fmt.Errorf("blockstore doesn't support gc once: %T", b)
}

View File

@ -47,6 +47,23 @@ var (
PruneThreshold = 7 * build.Finality
)
// GCHotstore runs online GC on the chain state in the hotstore according the to options specified
func (s *SplitStore) GCHotStore(opts api.HotGCOpts) error {
if opts.Moving {
gcOpts := []bstore.BlockstoreGCOption{bstore.WithFullGC(true)}
return s.gcBlockstore(s.hot, gcOpts)
}
gcOpts := []bstore.BlockstoreGCOption{bstore.WithThreshold(opts.Threshold)}
var err error
if opts.Periodic {
err = s.gcBlockstore(s.hot, gcOpts)
} else {
err = s.gcBlockstoreOnce(s.hot, gcOpts)
}
return err
}
// PruneChain instructs the SplitStore to prune chain state in the coldstore, according to the
// options specified.
func (s *SplitStore) PruneChain(opts api.PruneOpts) error {

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -235,7 +235,7 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context,
}
rErr := t.reward(ctx, vmi, em, epoch, ts, params)
if rErr != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("error applying reward: %w", err)
return cid.Undef, cid.Undef, xerrors.Errorf("error applying reward: %w", rErr)
}
}
@ -308,6 +308,14 @@ func (t *TipSetExecutor) ExecuteTipSet(ctx context.Context,
}
}
if ts.Height() == 0 {
// NB: This is here because the process that executes blocks requires that the
// block miner reference a valid miner in the state tree. Unless we create some
// magical genesis miner, this won't work properly, so we short circuit here
// This avoids the question of 'who gets paid the genesis block reward'
return blks[0].ParentStateRoot, blks[0].ParentMessageReceipts, nil
}
var parentEpoch abi.ChainEpoch
pstate := blks[0].ParentStateRoot
if blks[0].Height > 0 {

View File

@ -52,14 +52,6 @@ func (sm *StateManager) TipSetState(ctx context.Context, ts *types.TipSet) (st c
sm.stlk.Unlock()
if ts.Height() == 0 {
// NB: This is here because the process that executes blocks requires that the
// block miner reference a valid miner in the state tree. Unless we create some
// magical genesis miner, this won't work properly, so we short circuit here
// This avoids the question of 'who gets paid the genesis block reward'
return ts.Blocks()[0].ParentStateRoot, ts.Blocks()[0].ParentMessageReceipts, nil
}
st, rec, err = sm.tsExec.ExecuteTipSet(ctx, sm, ts, sm.tsExecMonitor, false)
if err != nil {
return cid.Undef, cid.Undef, err

View File

@ -1591,7 +1591,64 @@ func createExportFile(app *cli.App, path string) (io.WriteCloser, error) {
var ChainPruneCmd = &cli.Command{
Name: "prune",
Usage: "prune the stored chain state and perform garbage collection",
Usage: "splitstore gc",
Subcommands: []*cli.Command{
chainPruneColdCmd,
chainPruneHotGCCmd,
chainPruneHotMovingGCCmd,
},
}
var chainPruneHotGCCmd = &cli.Command{
Name: "hot",
Usage: "run online (badger vlog) garbage collection on hotstore",
Flags: []cli.Flag{
&cli.Float64Flag{Name: "threshold", Value: 0.01, Usage: "Threshold of vlog garbage for gc"},
&cli.BoolFlag{Name: "periodic", Value: false, Usage: "Run periodic gc over multiple vlogs. Otherwise run gc once"},
},
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPIV1(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
opts := lapi.HotGCOpts{}
opts.Periodic = cctx.Bool("periodic")
opts.Threshold = cctx.Float64("threshold")
gcStart := time.Now()
err = api.ChainHotGC(ctx, opts)
gcTime := time.Since(gcStart)
fmt.Printf("Online GC took %v (periodic <%t> threshold <%f>)", gcTime, opts.Periodic, opts.Threshold)
return err
},
}
var chainPruneHotMovingGCCmd = &cli.Command{
Name: "hot-moving",
Usage: "run moving gc on hotstore",
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPIV1(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
opts := lapi.HotGCOpts{}
opts.Moving = true
gcStart := time.Now()
err = api.ChainHotGC(ctx, opts)
gcTime := time.Since(gcStart)
fmt.Printf("Moving GC took %v", gcTime)
return err
},
}
var chainPruneColdCmd = &cli.Command{
Name: "compact-cold",
Usage: "force splitstore compaction on cold store state and run gc",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "online-gc",

View File

@ -19,6 +19,7 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-bitfield"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/proof"
"github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/build"
@ -640,7 +641,42 @@ It will not send any messages to the chain.`,
if err != nil {
return err
}
jr, err := json.Marshal(res)
//convert sector information into easily readable information
type PoStPartition struct {
Index uint64
Skipped []uint64
}
type SubmitWindowedPoStParams struct {
Deadline uint64
Partitions []PoStPartition
Proofs []proof.PoStProof
ChainCommitEpoch abi.ChainEpoch
ChainCommitRand abi.Randomness
}
var postParams []SubmitWindowedPoStParams
for _, i := range res {
var postParam SubmitWindowedPoStParams
postParam.Deadline = i.Deadline
for id, part := range i.Partitions {
postParam.Partitions[id].Index = part.Index
count, err := part.Skipped.Count()
if err != nil {
return err
}
sectors, err := part.Skipped.All(count)
if err != nil {
return err
}
postParam.Partitions[id].Skipped = sectors
}
postParam.Proofs = i.Proofs
postParam.ChainCommitEpoch = i.ChainCommitEpoch
postParam.ChainCommitRand = i.ChainCommitRand
postParams = append(postParams, postParam)
}
jr, err := json.MarshalIndent(postParams, "", " ")
if err != nil {
return err
}

View File

@ -29,6 +29,7 @@
* [ChainGetTipSetByHeight](#ChainGetTipSetByHeight)
* [ChainHasObj](#ChainHasObj)
* [ChainHead](#ChainHead)
* [ChainHotGC](#ChainHotGC)
* [ChainNotify](#ChainNotify)
* [ChainPrune](#ChainPrune)
* [ChainPutObj](#ChainPutObj)
@ -1074,6 +1075,26 @@ Response:
}
```
### ChainHotGC
ChainHotGC does online (badger) GC on the hot store; only supported if you are using
the splitstore
Perms: admin
Inputs:
```json
[
{
"Threshold": 12.3,
"Periodic": true,
"Moving": true
}
]
```
Response: `{}`
### ChainNotify
ChainNotify returns channel with chain head updates.
First message is guaranteed to be of len == 1, and type == 'current'.
@ -1098,7 +1119,7 @@ Response:
```
### ChainPrune
ChainPrune prunes the stored chain state and garbage collects; only supported if you
ChainPrune forces compaction on cold store and garbage collects; only supported if you
are using the splitstore

View File

@ -2117,7 +2117,7 @@ COMMANDS:
decode decode various types
encode encode various types
disputer interact with the window post disputer
prune prune the stored chain state and perform garbage collection
prune splitstore gc
help, h Shows a list of commands or help for one command
OPTIONS:
@ -2465,10 +2465,29 @@ OPTIONS:
### lotus chain prune
```
NAME:
lotus chain prune - prune the stored chain state and perform garbage collection
lotus chain prune - splitstore gc
USAGE:
lotus chain prune [command options] [arguments...]
lotus chain prune command [command options] [arguments...]
COMMANDS:
compact-cold force splitstore compaction on cold store state and run gc
hot run online (badger vlog) garbage collection on hotstore
hot-moving run moving gc on hotstore
help, h Shows a list of commands or help for one command
OPTIONS:
--help, -h show help (default: false)
```
#### lotus chain prune compact-cold
```
NAME:
lotus chain prune compact-cold - force splitstore compaction on cold store state and run gc
USAGE:
lotus chain prune compact-cold [command options] [arguments...]
OPTIONS:
--moving-gc use moving gc for garbage collecting the coldstore (default: false)
@ -2477,6 +2496,33 @@ OPTIONS:
```
#### lotus chain prune hot
```
NAME:
lotus chain prune hot - run online (badger vlog) garbage collection on hotstore
USAGE:
lotus chain prune hot [command options] [arguments...]
OPTIONS:
--periodic Run periodic gc over multiple vlogs. Otherwise run gc once (default: false)
--threshold value Threshold of vlog garbage for gc (default: 0.01)
```
#### lotus chain prune hot-moving
```
NAME:
lotus chain prune hot-moving - run moving gc on hotstore
USAGE:
lotus chain prune hot-moving [command options] [arguments...]
OPTIONS:
--help, -h show help (default: false)
```
## lotus log
```
NAME:

View File

@ -27,6 +27,7 @@ import (
_ "github.com/filecoin-project/lotus/lib/sigs/secp"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node/impl/full"
"github.com/filecoin-project/lotus/node/modules/dtypes"
)
const (
@ -59,18 +60,22 @@ type TargetAPI interface {
ChainPutObj(context.Context, blocks.Block) error
ChainGetGenesis(context.Context) (*types.TipSet, error)
GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, tsk types.TipSetKey) (*types.Message, error)
MpoolGetNonce(ctx context.Context, addr address.Address) (uint64, error)
MpoolPushUntrusted(ctx context.Context, sm *types.SignedMessage) (cid.Cid, error)
MsigGetAvailableBalance(ctx context.Context, addr address.Address, tsk types.TipSetKey) (types.BigInt, error)
MsigGetVested(ctx context.Context, addr address.Address, start types.TipSetKey, end types.TipSetKey) (types.BigInt, error)
MsigGetVestingSchedule(context.Context, address.Address, types.TipSetKey) (api.MsigVesting, error)
MsigGetPending(ctx context.Context, addr address.Address, ts types.TipSetKey) ([]*api.MsigTransaction, error)
StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error)
StateCall(ctx context.Context, msg *types.Message, tsk types.TipSetKey) (*api.InvocResult, error)
StateDealProviderCollateralBounds(ctx context.Context, size abi.PaddedPieceSize, verified bool, tsk types.TipSetKey) (api.DealCollateralBounds, error)
StateDecodeParams(ctx context.Context, toAddr address.Address, method abi.MethodNum, params []byte, tsk types.TipSetKey) (interface{}, error)
StateGetActor(ctx context.Context, actor address.Address, ts types.TipSetKey) (*types.Actor, error)
StateLookupID(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error)
StateListMiners(ctx context.Context, tsk types.TipSetKey) ([]address.Address, error)
StateMarketBalance(ctx context.Context, addr address.Address, tsk types.TipSetKey) (api.MarketBalance, error)
StateMarketStorageDeal(ctx context.Context, dealId abi.DealID, tsk types.TipSetKey) (*api.MarketDeal, error)
StateNetworkName(context.Context) (dtypes.NetworkName, error)
StateNetworkVersion(context.Context, types.TipSetKey) (network.Version, error)
StateSearchMsg(ctx context.Context, from types.TipSetKey, msg cid.Cid, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error)
StateWaitMsg(ctx context.Context, cid cid.Cid, confidence uint64, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error)

View File

@ -20,6 +20,7 @@ import (
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/sigs"
"github.com/filecoin-project/lotus/node/modules/dtypes"
)
func (gw *Node) Discover(ctx context.Context) (apitypes.OpenRPCDocument, error) {
@ -187,6 +188,13 @@ func (gw *Node) GasEstimateMessageGas(ctx context.Context, msg *types.Message, s
return gw.target.GasEstimateMessageGas(ctx, msg, spec, tsk)
}
func (gw *Node) MpoolGetNonce(ctx context.Context, addr address.Address) (uint64, error) {
if err := gw.limit(ctx, stateRateLimitTokens); err != nil {
return 0, err
}
return gw.target.MpoolGetNonce(ctx, addr)
}
func (gw *Node) MpoolPush(ctx context.Context, sm *types.SignedMessage) (cid.Cid, error) {
if err := gw.limit(ctx, stateRateLimitTokens); err != nil {
return cid.Cid{}, err
@ -248,6 +256,16 @@ func (gw *Node) StateAccountKey(ctx context.Context, addr address.Address, tsk t
return gw.target.StateAccountKey(ctx, addr, tsk)
}
func (gw *Node) StateCall(ctx context.Context, msg *types.Message, tsk types.TipSetKey) (*api.InvocResult, error) {
if err := gw.limit(ctx, stateRateLimitTokens); err != nil {
return nil, err
}
if err := gw.checkTipsetKey(ctx, tsk); err != nil {
return nil, err
}
return gw.target.StateCall(ctx, msg, tsk)
}
func (gw *Node) StateDealProviderCollateralBounds(ctx context.Context, size abi.PaddedPieceSize, verified bool, tsk types.TipSetKey) (api.DealCollateralBounds, error) {
if err := gw.limit(ctx, stateRateLimitTokens); err != nil {
return api.DealCollateralBounds{}, err
@ -258,6 +276,16 @@ func (gw *Node) StateDealProviderCollateralBounds(ctx context.Context, size abi.
return gw.target.StateDealProviderCollateralBounds(ctx, size, verified, tsk)
}
func (gw *Node) StateDecodeParams(ctx context.Context, toAddr address.Address, method abi.MethodNum, params []byte, tsk types.TipSetKey) (interface{}, error) {
if err := gw.limit(ctx, stateRateLimitTokens); err != nil {
return nil, err
}
if err := gw.checkTipsetKey(ctx, tsk); err != nil {
return nil, err
}
return gw.target.StateDecodeParams(ctx, toAddr, method, params, tsk)
}
func (gw *Node) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) {
if err := gw.limit(ctx, stateRateLimitTokens); err != nil {
return nil, err
@ -308,6 +336,13 @@ func (gw *Node) StateMarketStorageDeal(ctx context.Context, dealId abi.DealID, t
return gw.target.StateMarketStorageDeal(ctx, dealId, tsk)
}
func (gw *Node) StateNetworkName(ctx context.Context) (dtypes.NetworkName, error) {
if err := gw.limit(ctx, stateRateLimitTokens); err != nil {
return *new(dtypes.NetworkName), err
}
return gw.target.StateNetworkName(ctx)
}
func (gw *Node) StateNetworkVersion(ctx context.Context, tsk types.TipSetKey) (network.Version, error) {
if err := gw.limit(ctx, stateRateLimitTokens); err != nil {
return network.VersionMax, err

2
go.mod
View File

@ -40,7 +40,7 @@ require (
github.com/filecoin-project/go-fil-commcid v0.1.0
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0
github.com/filecoin-project/go-fil-markets v1.26.0
github.com/filecoin-project/go-jsonrpc v0.2.1
github.com/filecoin-project/go-jsonrpc v0.2.2
github.com/filecoin-project/go-legs v0.4.4
github.com/filecoin-project/go-padreader v0.0.1
github.com/filecoin-project/go-paramfetch v0.0.4

4
go.sum
View File

@ -340,8 +340,8 @@ github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0/go.mod h1:7aWZdaQ1b16BVoQUYR+
github.com/filecoin-project/go-hamt-ipld/v3 v3.0.1/go.mod h1:gXpNmr3oQx8l3o7qkGyDjJjYSRX7hp/FGOStdqrWyDI=
github.com/filecoin-project/go-hamt-ipld/v3 v3.1.0 h1:rVVNq0x6RGQIzCo1iiJlGFm9AGIZzeifggxtKMU7zmI=
github.com/filecoin-project/go-hamt-ipld/v3 v3.1.0/go.mod h1:bxmzgT8tmeVQA1/gvBwFmYdT8SOFUwB3ovSUfG1Ux0g=
github.com/filecoin-project/go-jsonrpc v0.2.1 h1:xfxkfIAO300sPiV59DnxnCb4sdTtWYlRz/TsP+ByT2E=
github.com/filecoin-project/go-jsonrpc v0.2.1/go.mod h1:jBSvPTl8V1N7gSTuCR4bis8wnQnIjHbRPpROol6iQKM=
github.com/filecoin-project/go-jsonrpc v0.2.2 h1:yo7Ga5qaSFfAukjyI6pdFBxzUVbQoHjKdYMpf2vMvh4=
github.com/filecoin-project/go-jsonrpc v0.2.2/go.mod h1:jBSvPTl8V1N7gSTuCR4bis8wnQnIjHbRPpROol6iQKM=
github.com/filecoin-project/go-legs v0.4.4 h1:mpMmAOOnamaz0CV9rgeKhEWA8j9kMC+f+UGCGrxKaZo=
github.com/filecoin-project/go-legs v0.4.4/go.mod h1:JQ3hA6xpJdbR8euZ2rO0jkxaMxeidXf0LDnVuqPAe9s=
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20/go.mod h1:mPn+LRRd5gEKNAtc+r3ScpW2JRU/pj4NBKdADYWHiak=

View File

@ -87,3 +87,25 @@ func TestFilecoinAddressToEthAddress(t *testing.T) {
require.ErrorContains(t, err, ethtypes.ErrInvalidAddress.Error())
}
func TestEthGetGenesis(t *testing.T) {
blockTime := 100 * time.Millisecond
client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC())
ens.InterconnectAll().BeginMining(blockTime)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
ethBlk, err := client.EVM().EthGetBlockByNumber(ctx, "0x0", true)
require.NoError(t, err)
genesis, err := client.ChainGetGenesis(ctx)
require.NoError(t, err)
genesisCid, err := genesis.Key().Cid()
require.NoError(t, err)
genesisHash, err := ethtypes.EthHashFromCid(genesisCid)
require.NoError(t, err)
require.Equal(t, ethBlk.Hash, genesisHash)
}

View File

@ -3,18 +3,42 @@ package itests
import (
"context"
"encoding/json"
"sort"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/types/ethtypes"
"github.com/filecoin-project/lotus/itests/kit"
"github.com/filecoin-project/lotus/lib/result"
"github.com/filecoin-project/lotus/node/impl/full"
)
// calculateExpectations calculates the expected number of items to be included in the response
// of eth_feeHistory. It takes care of null rounds by finding the closet tipset with height
// smaller than startHeight, and then looks back at requestAmount of items. It also considers
// scenarios where there are not enough items to look back.
func calculateExpectations(tsHeights []int, requestAmount, startHeight int) (count, oldestHeight int) {
latestIdx := sort.SearchInts(tsHeights, startHeight)
// SearchInts returns the index of the number that's larger than the target if the target
// doesn't exist. However, we're looking for the closet number that's smaller that the target
for tsHeights[latestIdx] > startHeight {
latestIdx--
}
cnt := requestAmount
oldestIdx := latestIdx - requestAmount + 1
if oldestIdx < 0 {
cnt = latestIdx + 1
oldestIdx = 0
}
return cnt, tsHeights[oldestIdx]
}
func TestEthFeeHistory(t *testing.T) {
require := require.New(t)
@ -22,70 +46,136 @@ func TestEthFeeHistory(t *testing.T) {
blockTime := 100 * time.Millisecond
client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC())
ens.InterconnectAll().BeginMining(blockTime)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
// Wait for the network to create 20 blocks
heads, err := client.ChainNotify(ctx)
require.NoError(err)
// Save the full view of the tipsets to calculate the answer when there are null rounds
tsHeights := []int{1}
go func() {
for chg := range heads {
for _, c := range chg {
tsHeights = append(tsHeights, int(c.Val.Height()))
}
}
}()
miner := ens.InterconnectAll().BeginMining(blockTime)
client.WaitTillChain(ctx, kit.HeightAtLeast(7))
miner[0].InjectNulls(abi.ChainEpoch(5))
// Wait for the network to create at least 20 tipsets
client.WaitTillChain(ctx, kit.HeightAtLeast(20))
for _, m := range miner {
m.Pause()
}
ch, err := client.ChainNotify(ctx)
require.NoError(err)
// Wait for 5 seconds of inactivity
func() {
for {
select {
case <-ch:
continue
case <-time.After(5 * time.Second):
return
}
}
}()
sort.Ints(tsHeights)
// because of the deferred execution, the last tipset is not executed yet,
// and the one before the last one is the last executed tipset,
// which corresponds to the "latest" tag in EthGetBlockByNumber
latestBlk := ethtypes.EthUint64(tsHeights[len(tsHeights)-2])
blk, err := client.EthGetBlockByNumber(ctx, "latest", false)
require.NoError(err)
require.Equal(blk.Number, latestBlk)
assertHistory := func(history *ethtypes.EthFeeHistory, requestAmount, startHeight int) {
amount, oldest := calculateExpectations(tsHeights, requestAmount, startHeight)
require.Equal(amount+1, len(history.BaseFeePerGas))
require.Equal(amount, len(history.GasUsedRatio))
require.Equal(ethtypes.EthUint64(oldest), history.OldestBlock)
}
history, err := client.EthFeeHistory(ctx, result.Wrap[jsonrpc.RawParams](
json.Marshal([]interface{}{5, "0x10"}),
).Assert(require.NoError))
require.NoError(err)
require.Equal(6, len(history.BaseFeePerGas))
require.Equal(5, len(history.GasUsedRatio))
require.Equal(ethtypes.EthUint64(16-5+1), history.OldestBlock)
assertHistory(&history, 5, 16)
require.Nil(history.Reward)
history, err = client.EthFeeHistory(ctx, result.Wrap[jsonrpc.RawParams](
json.Marshal([]interface{}{"5", "0x10"}),
).Assert(require.NoError))
require.NoError(err)
require.Equal(6, len(history.BaseFeePerGas))
require.Equal(5, len(history.GasUsedRatio))
require.Equal(ethtypes.EthUint64(16-5+1), history.OldestBlock)
assertHistory(&history, 5, 16)
require.Nil(history.Reward)
history, err = client.EthFeeHistory(ctx, result.Wrap[jsonrpc.RawParams](
json.Marshal([]interface{}{5, "latest"}),
).Assert(require.NoError))
require.NoError(err)
assertHistory(&history, 5, int(latestBlk))
require.Nil(history.Reward)
history, err = client.EthFeeHistory(ctx, result.Wrap[jsonrpc.RawParams](
json.Marshal([]interface{}{"0x10", "0x12"}),
).Assert(require.NoError))
require.NoError(err)
require.Equal(17, len(history.BaseFeePerGas))
require.Equal(16, len(history.GasUsedRatio))
require.Equal(ethtypes.EthUint64(18-16+1), history.OldestBlock)
assertHistory(&history, 16, 18)
require.Nil(history.Reward)
history, err = client.EthFeeHistory(ctx, result.Wrap[jsonrpc.RawParams](
json.Marshal([]interface{}{5, "0x10"}),
).Assert(require.NoError))
require.NoError(err)
require.Equal(6, len(history.BaseFeePerGas))
require.Equal(5, len(history.GasUsedRatio))
require.Equal(ethtypes.EthUint64(16-5+1), history.OldestBlock)
assertHistory(&history, 5, 16)
require.Nil(history.Reward)
history, err = client.EthFeeHistory(ctx, result.Wrap[jsonrpc.RawParams](
json.Marshal([]interface{}{5, "10"}),
).Assert(require.NoError))
require.NoError(err)
require.Equal(6, len(history.BaseFeePerGas))
require.Equal(5, len(history.GasUsedRatio))
require.Equal(ethtypes.EthUint64(10-5+1), history.OldestBlock)
assertHistory(&history, 5, 10)
require.Nil(history.Reward)
// test when the requested number of blocks is longer than chain length
history, err = client.EthFeeHistory(ctx, result.Wrap[jsonrpc.RawParams](
json.Marshal([]interface{}{"0x30", "latest"}),
).Assert(require.NoError))
require.NoError(err)
assertHistory(&history, 48, int(latestBlk))
require.Nil(history.Reward)
// test when the requested number of blocks is longer than chain length
history, err = client.EthFeeHistory(ctx, result.Wrap[jsonrpc.RawParams](
json.Marshal([]interface{}{"0x30", "10"}),
).Assert(require.NoError))
require.NoError(err)
assertHistory(&history, 48, 10)
require.Nil(history.Reward)
history, err = client.EthFeeHistory(ctx, result.Wrap[jsonrpc.RawParams](
json.Marshal([]interface{}{5, "10", &[]float64{25, 50, 75}}),
).Assert(require.NoError))
require.NoError(err)
require.Equal(6, len(history.BaseFeePerGas))
require.Equal(5, len(history.GasUsedRatio))
require.Equal(ethtypes.EthUint64(10-5+1), history.OldestBlock)
assertHistory(&history, 5, 10)
require.NotNil(history.Reward)
require.Equal(5, len(*history.Reward))
for _, arr := range *history.Reward {
require.Equal(3, len(arr))
for _, item := range arr {
require.Equal(ethtypes.EthBigInt(types.NewInt(full.MinGasPremium)), item)
}
}
history, err = client.EthFeeHistory(ctx, result.Wrap[jsonrpc.RawParams](
@ -93,6 +183,11 @@ func TestEthFeeHistory(t *testing.T) {
).Assert(require.NoError))
require.Error(err)
history, err = client.EthFeeHistory(ctx, result.Wrap[jsonrpc.RawParams](
json.Marshal([]interface{}{5, "10", &[]float64{75, 50}}),
).Assert(require.NoError))
require.Error(err)
history, err = client.EthFeeHistory(ctx, result.Wrap[jsonrpc.RawParams](
json.Marshal([]interface{}{5, "10", &[]float64{}}),
).Assert(require.NoError))

View File

@ -336,6 +336,8 @@ func (n *Ensemble) Worker(minerNode *TestMiner, worker *TestWorker, opts ...Node
MinerNode: minerNode,
RemoteListener: rl,
options: options,
Stop: func(ctx context.Context) error { return nil },
}
n.inactive.workers = append(n.inactive.workers, worker)

View File

@ -96,6 +96,12 @@ func workerRpc(t *testing.T, m *TestWorker) *TestWorker {
require.NoError(t, err)
t.Cleanup(stop)
m.Stop = func(ctx context.Context) error {
srv.Close()
srv.CloseClientConnections()
return nil
}
m.ListenAddr, m.Worker = maddr, cl
return m
}

View File

@ -366,7 +366,7 @@ func TestWindowPostWorkerManualPoSt(t *testing.T) {
sectors := 2 * 48 * 2
client, miner, _, ens := kit.EnsembleWorker(t,
client, miner, _, _ := kit.EnsembleWorker(t,
kit.PresealSectors(sectors), // 2 sectors per partition, 2 partitions in all 48 deadlines
kit.LatestActorsAt(-1),
kit.ThroughRPC(),
@ -378,17 +378,8 @@ func TestWindowPostWorkerManualPoSt(t *testing.T) {
di, err := client.StateMinerProvingDeadline(ctx, maddr, types.EmptyTSK)
require.NoError(t, err)
bm := ens.InterconnectAll().BeginMiningMustPost(2 * time.Millisecond)[0]
di = di.NextNotElapsed()
t.Log("Running one proving period")
waitUntil := di.Open + di.WPoStChallengeWindow*2 - 2
client.WaitTillChain(ctx, kit.HeightAtLeast(waitUntil))
t.Log("Waiting for post message")
bm.Stop()
tryDl := func(dl uint64) {
p, err := miner.ComputeWindowPoSt(ctx, dl, types.EmptyTSK)
require.NoError(t, err)
@ -398,10 +389,48 @@ func TestWindowPostWorkerManualPoSt(t *testing.T) {
tryDl(0)
tryDl(40)
tryDl(di.Index + 4)
}
lastPending, err := client.MpoolPending(ctx, types.EmptyTSK)
func TestWindowPostWorkerDisconnected(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_ = logging.SetLogLevel("storageminer", "INFO")
sectors := 2 * 48 * 2
_, miner, badWorker, ens := kit.EnsembleWorker(t,
kit.PresealSectors(sectors), // 2 sectors per partition, 2 partitions in all 48 deadlines
kit.LatestActorsAt(-1),
kit.ThroughRPC(),
kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTGenerateWindowPoSt}))
var goodWorker kit.TestWorker
ens.Worker(miner, &goodWorker, kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTGenerateWindowPoSt}), kit.ThroughRPC()).Start()
// wait for all workers
require.Eventually(t, func() bool {
w, err := miner.WorkerStats(ctx)
require.NoError(t, err)
return len(w) == 3 // 2 post + 1 miner-builtin
}, 10*time.Second, 100*time.Millisecond)
tryDl := func(dl uint64) {
p, err := miner.ComputeWindowPoSt(ctx, dl, types.EmptyTSK)
require.NoError(t, err)
require.Len(t, p, 1)
require.Equal(t, dl, p[0].Deadline)
}
tryDl(0) // this will run on the not-yet-bad badWorker
err := badWorker.Stop(ctx)
require.NoError(t, err)
require.Len(t, lastPending, 0)
tryDl(10) // will fail on the badWorker, then should retry on the goodWorker
time.Sleep(15 * time.Second)
tryDl(40) // after HeartbeatInterval, the badWorker should be marked as disabled
}
func TestSchedulerRemoveRequest(t *testing.T) {

View File

@ -221,7 +221,7 @@ type RpcReader struct {
res chan readRes
beginOnce *sync.Once
closeOnce sync.Once
closeOnce *sync.Once
}
var ErrHasBody = errors.New("RPCReader has body, either already read from or from a client with no redirect support")
@ -265,6 +265,7 @@ func (w *RpcReader) beginPost() {
w.postBody = nr.postBody
w.res = nr.res
w.beginOnce = nr.beginOnce
w.closeOnce = nr.closeOnce
}
}
@ -355,6 +356,7 @@ func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) {
res: make(chan readRes),
next: ch,
beginOnce: &sync.Once{},
closeOnce: &sync.Once{},
}
switch req.Method {

View File

@ -3,8 +3,8 @@ package rpcenc
import (
"context"
"fmt"
"io"
"io/ioutil"
"net/http/httptest"
"strings"
"sync"
@ -77,7 +77,12 @@ func (h *ReaderHandler) CloseReader(ctx context.Context, r io.Reader) error {
}
func (h *ReaderHandler) ReadAll(ctx context.Context, r io.Reader) ([]byte, error) {
return ioutil.ReadAll(r)
b, err := io.ReadAll(r)
if err != nil {
return nil, xerrors.Errorf("readall: %w", err)
}
return b, nil
}
func (h *ReaderHandler) ReadNullLen(ctx context.Context, r io.Reader) (int64, error) {
@ -219,9 +224,15 @@ func TestReaderRedirect(t *testing.T) {
}
func TestReaderRedirectDrop(t *testing.T) {
for i := 0; i < 10; i++ {
t.Run(fmt.Sprintf("test %d", i), testReaderRedirectDrop)
}
}
func testReaderRedirectDrop(t *testing.T) {
// lower timeout so that the dangling connection between client and reader is dropped quickly
// after the test. Otherwise httptest.Close is blocked.
Timeout = 200 * time.Millisecond
Timeout = 90 * time.Millisecond
var allClient struct {
ReadAll func(ctx context.Context, r io.Reader) ([]byte, error)
@ -294,6 +305,8 @@ func TestReaderRedirectDrop(t *testing.T) {
done.Wait()
fmt.Println("---------------------")
// Redir client drops before subcall
done.Add(1)
@ -322,5 +335,9 @@ func TestReaderRedirectDrop(t *testing.T) {
// wait for subcall to finish
<-contCh
require.ErrorContains(t, allServerHandler.subErr, "decoding params for 'ReaderHandler.ReadAll' (param: 0; custom decoder): context canceled")
estr := allServerHandler.subErr.Error()
require.True(t,
strings.Contains(estr, "decoding params for 'ReaderHandler.ReadAll' (param: 0; custom decoder): context canceled") ||
strings.Contains(estr, "readall: unexpected EOF"), "unexpected error: %s", estr)
}

View File

@ -748,3 +748,14 @@ func (a *ChainAPI) ChainPrune(ctx context.Context, opts api.PruneOpts) error {
return pruner.PruneChain(opts)
}
func (a *ChainAPI) ChainHotGC(ctx context.Context, opts api.HotGCOpts) error {
pruner, ok := a.BaseBlockstore.(interface {
GCHotStore(api.HotGCOpts) error
})
if !ok {
return xerrors.Errorf("base blockstore does not support hot GC (%T)", a.BaseBlockstore)
}
return pruner.GCHotStore(opts)
}

View File

@ -252,7 +252,7 @@ func (a *EthModule) parseBlkParam(ctx context.Context, blkParam string) (tipset
if err != nil {
return nil, fmt.Errorf("cannot parse block number: %v", err)
}
ts, err := a.Chain.GetTipsetByHeight(ctx, abi.ChainEpoch(num), nil, false)
ts, err := a.Chain.GetTipsetByHeight(ctx, abi.ChainEpoch(num), nil, true)
if err != nil {
return nil, fmt.Errorf("cannot get tipset at height: %v", num)
}
@ -681,11 +681,7 @@ func (a *EthModule) EthFeeHistory(ctx context.Context, p jsonrpc.RawParams) (eth
return ethtypes.EthFeeHistory{}, fmt.Errorf("bad block parameter %s: %s", params.NewestBlkNum, err)
}
// Deal with the case that the chain is shorter than the number of requested blocks.
oldestBlkHeight := uint64(1)
if abi.ChainEpoch(params.BlkCount) <= ts.Height() {
oldestBlkHeight = uint64(ts.Height()) - uint64(params.BlkCount) + 1
}
// NOTE: baseFeePerGas should include the next block after the newest of the returned range,
// because the next base fee can be inferred from the messages in the newest block.
@ -695,29 +691,32 @@ func (a *EthModule) EthFeeHistory(ctx context.Context, p jsonrpc.RawParams) (eth
gasUsedRatioArray := []float64{}
rewardsArray := make([][]ethtypes.EthBigInt, 0)
for ts.Height() >= abi.ChainEpoch(oldestBlkHeight) {
// Unfortunately we need to rebuild the full message view so we can
// totalize gas used in the tipset.
msgs, err := a.Chain.MessagesForTipset(ctx, ts)
blocksIncluded := 0
for blocksIncluded < int(params.BlkCount) && ts.Height() > 0 {
compOutput, err := a.StateCompute(ctx, ts.Height(), nil, ts.Key())
if err != nil {
return ethtypes.EthFeeHistory{}, xerrors.Errorf("error loading messages for tipset: %v: %w", ts, err)
return ethtypes.EthFeeHistory{}, xerrors.Errorf("cannot lookup the status of tipset: %v: %w", ts, err)
}
txGasRewards := gasRewardSorter{}
for txIdx, msg := range msgs {
msgLookup, err := a.StateAPI.StateSearchMsg(ctx, types.EmptyTSK, msg.Cid(), api.LookbackNoLimit, false)
if err != nil || msgLookup == nil {
return ethtypes.EthFeeHistory{}, nil
for _, msg := range compOutput.Trace {
if msg.Msg.From == builtintypes.SystemActorAddr {
continue
}
tx, err := newEthTxFromMessageLookup(ctx, msgLookup, txIdx, a.Chain, a.StateAPI)
smsgCid, err := getSignedMessage(ctx, a.Chain, msg.MsgCid)
if err != nil {
return ethtypes.EthFeeHistory{}, nil
return ethtypes.EthFeeHistory{}, xerrors.Errorf("failed to get signed msg %s: %w", msg.MsgCid, err)
}
tx, err := newEthTxFromSignedMessage(ctx, smsgCid, a.StateAPI)
if err != nil {
return ethtypes.EthFeeHistory{}, err
}
txGasRewards = append(txGasRewards, gasRewardTuple{
reward: tx.Reward(ts.Blocks()[0].ParentBaseFee),
gas: uint64(msgLookup.Receipt.GasUsed),
gas: uint64(msg.MsgRct.GasUsed),
})
}
@ -727,6 +726,8 @@ func (a *EthModule) EthFeeHistory(ctx context.Context, p jsonrpc.RawParams) (eth
baseFeeArray = append(baseFeeArray, ethtypes.EthBigInt(ts.Blocks()[0].ParentBaseFee))
gasUsedRatioArray = append(gasUsedRatioArray, float64(totalGasUsed)/float64(build.BlockGasLimit))
rewardsArray = append(rewardsArray, rewards)
oldestBlkHeight = uint64(ts.Height())
blocksIncluded++
parentTsKey := ts.Parents()
ts, err = a.Chain.LoadTipSet(ctx, parentTsKey)
@ -1763,11 +1764,7 @@ func (e *ethSubscription) stop() {
}
func newEthBlockFromFilecoinTipSet(ctx context.Context, ts *types.TipSet, fullTxInfo bool, cs *store.ChainStore, sa StateAPI) (ethtypes.EthBlock, error) {
parent, err := cs.LoadTipSet(ctx, ts.Parents())
if err != nil {
return ethtypes.EthBlock{}, err
}
parentKeyCid, err := parent.Key().Cid()
parentKeyCid, err := ts.Parents().Cid()
if err != nil {
return ethtypes.EthBlock{}, err
}
@ -2347,7 +2344,7 @@ func calculateRewardsAndGasUsed(rewardPercentiles []float64, txGasRewards gasRew
rewards := make([]ethtypes.EthBigInt, len(rewardPercentiles))
for i := range rewards {
rewards[i] = ethtypes.EthBigIntZero
rewards[i] = ethtypes.EthBigInt(types.NewInt(MinGasPremium))
}
if len(txGasRewards) == 0 {

View File

@ -135,7 +135,7 @@ func TestRewardPercentiles(t *testing.T) {
{
percentiles: []float64{25, 50, 75},
txGasRewards: []gasRewardTuple{},
answer: []int64{0, 0, 0},
answer: []int64{MinGasPremium, MinGasPremium, MinGasPremium},
},
{
percentiles: []float64{25, 50, 75, 100},

View File

@ -6,8 +6,10 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"os"
"reflect"
"time"
@ -19,9 +21,15 @@ import (
"github.com/filecoin-project/lotus/api"
)
var errSectorRemoved = errors.New("sector removed")
func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface{}, uint64, error) {
next, processed, err := m.plan(events, user.(*SectorInfo))
if err != nil || next == nil {
if err == errSectorRemoved && os.Getenv("LOTUS_KEEP_REMOVED_FSM_ACTIVE") != "1" {
return nil, processed, statemachine.ErrTerminated
}
l := Log{
Timestamp: uint64(time.Now().Unix()),
Message: fmt.Sprintf("state machine error: %s", err),
@ -601,7 +609,7 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
case Removing:
return m.handleRemoving, processed, nil
case Removed:
return nil, processed, nil
return nil, processed, errSectorRemoved
case RemoveFailed:
return m.handleRemoveFailed, processed, nil
@ -615,13 +623,14 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
// Fatal errors
case UndefinedSectorState:
log.Error("sector update with undefined state!")
return nil, processed, xerrors.Errorf("sector update with undefined state")
case FailedUnrecoverable:
log.Errorf("sector %d failed unrecoverably", state.SectorNumber)
return nil, processed, xerrors.Errorf("sector %d failed unrecoverably", state.SectorNumber)
default:
log.Errorf("unexpected sector update state: %s", state.State)
return nil, processed, xerrors.Errorf("unexpected sector update state: %s", state.State)
}
return nil, processed, nil
}
func (m *Sealing) onUpdateSector(ctx context.Context, state *SectorInfo) error {

View File

@ -289,14 +289,20 @@ func (m *Manager) ServeHTTP(w http.ResponseWriter, r *http.Request) {
m.remoteHnd.ServeHTTP(w, r)
}
func schedNop(context.Context, Worker) error {
return nil
var schedNop = PrepareAction{
Action: func(ctx context.Context, w Worker) error {
return nil
},
PrepType: sealtasks.TTNoop,
}
func (m *Manager) schedFetch(sector storiface.SectorRef, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) func(context.Context, Worker) error {
return func(ctx context.Context, worker Worker) error {
_, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, ft, ptype, am))
return err
func (m *Manager) schedFetch(sector storiface.SectorRef, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) PrepareAction {
return PrepareAction{
Action: func(ctx context.Context, worker Worker) error {
_, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, ft, ptype, am))
return err
},
PrepType: sealtasks.TTFetch,
}
}
@ -315,16 +321,19 @@ func (m *Manager) SectorsUnsealPiece(ctx context.Context, sector storiface.Secto
// if the selected worker does NOT have the sealed files for the sector, instruct it to fetch it from a worker that has them and
// put it in the sealing scratch space.
sealFetch := func(ctx context.Context, worker Worker) error {
log.Debugf("copy sealed/cache sector data for sector %d", sector.ID)
_, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.PathSealing, storiface.AcquireCopy))
_, err2 := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTUpdate|storiface.FTUpdateCache, storiface.PathSealing, storiface.AcquireCopy))
sealFetch := PrepareAction{
Action: func(ctx context.Context, worker Worker) error {
log.Debugf("copy sealed/cache sector data for sector %d", sector.ID)
_, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.PathSealing, storiface.AcquireCopy))
_, err2 := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTUpdate|storiface.FTUpdateCache, storiface.PathSealing, storiface.AcquireCopy))
if err != nil && err2 != nil {
return xerrors.Errorf("cannot unseal piece. error fetching sealed data: %w. error fetching replica data: %w", err, err2)
}
if err != nil && err2 != nil {
return xerrors.Errorf("cannot unseal piece. error fetching sealed data: %w. error fetching replica data: %w", err, err2)
}
return nil
return nil
},
PrepType: sealtasks.TTFetch,
}
if unsealed == nil {

View File

@ -196,7 +196,7 @@ func (m *Manager) generateWindowPoSt(ctx context.Context, minerID abi.ActorID, s
skipped = append(skipped, sk...)
if err != nil {
retErr = multierr.Append(retErr, xerrors.Errorf("partitionCount:%d err:%+v", partIdx, err))
retErr = multierr.Append(retErr, xerrors.Errorf("partitionIndex:%d err:%+v", partIdx, err))
}
flk.Unlock()
}

View File

@ -42,6 +42,10 @@ func WithPriority(ctx context.Context, priority int) context.Context {
const mib = 1 << 20
type WorkerAction func(ctx context.Context, w Worker) error
type PrepareAction struct {
Action WorkerAction
PrepType sealtasks.TaskType
}
type SchedWorker interface {
TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error)
@ -130,7 +134,7 @@ type WorkerRequest struct {
Sel WorkerSelector
SchedId uuid.UUID
prepare WorkerAction
prepare PrepareAction
work WorkerAction
start time.Time
@ -157,7 +161,15 @@ func newScheduler(ctx context.Context, assigner string) (*Scheduler, error) {
case "", "utilization":
a = NewLowestUtilizationAssigner()
case "spread":
a = NewSpreadAssigner()
a = NewSpreadAssigner(false)
case "experiment-spread-qcount":
a = NewSpreadAssigner(true)
case "experiment-spread-tasks":
a = NewSpreadTasksAssigner(false)
case "experiment-spread-tasks-qcount":
a = NewSpreadTasksAssigner(true)
case "experiment-random":
a = NewRandomAssigner()
default:
return nil, xerrors.Errorf("unknown assigner '%s'", assigner)
}
@ -189,7 +201,7 @@ func newScheduler(ctx context.Context, assigner string) (*Scheduler, error) {
}, nil
}
func (sh *Scheduler) Schedule(ctx context.Context, sector storiface.SectorRef, taskType sealtasks.TaskType, sel WorkerSelector, prepare WorkerAction, work WorkerAction) error {
func (sh *Scheduler) Schedule(ctx context.Context, sector storiface.SectorRef, taskType sealtasks.TaskType, sel WorkerSelector, prepare PrepareAction, work WorkerAction) error {
ret := make(chan workerResponse)
select {
@ -239,6 +251,13 @@ func (r *WorkerRequest) SealTask() sealtasks.SealTaskType {
}
}
func (r *WorkerRequest) PrepSealTask() sealtasks.SealTaskType {
return sealtasks.SealTaskType{
TaskType: r.prepare.PrepType,
RegisteredSealProof: r.Sector.ProofType,
}
}
type SchedDiagRequestInfo struct {
Sector abi.SectorID
TaskType sealtasks.TaskType

View File

@ -58,7 +58,7 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {
windows := make([]SchedWindow, windowsLen)
for i := range windows {
windows[i].Allocated = *NewActiveResources()
windows[i].Allocated = *NewActiveResources(newTaskCounter())
}
acceptableWindows := make([][]int, queueLen) // QueueIndex -> []OpenWindowIndex

View File

@ -0,0 +1,88 @@
package sealer
import (
"math/rand"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
func NewRandomAssigner() Assigner {
return &AssignerCommon{
WindowSel: RandomWS,
}
}
func RandomWS(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []SchedWindow) int {
scheduled := 0
rmQueue := make([]int, 0, queueLen)
for sqi := 0; sqi < queueLen; sqi++ {
task := (*sh.SchedQueue)[sqi]
//bestAssigned := math.MaxInt // smaller = better
type choice struct {
selectedWindow int
needRes storiface.Resources
info storiface.WorkerInfo
bestWid storiface.WorkerID
}
choices := make([]choice, 0, len(acceptableWindows[task.IndexHeap]))
for i, wnd := range acceptableWindows[task.IndexHeap] {
wid := sh.OpenWindows[wnd].Worker
w := sh.Workers[wid]
res := w.Info.Resources.ResourceSpec(task.Sector.ProofType, task.TaskType)
log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i)
if !windows[wnd].Allocated.CanHandleRequest(task.SealTask(), res, wid, "schedAssign", w.Info) {
continue
}
choices = append(choices, choice{
selectedWindow: wnd,
needRes: res,
info: w.Info,
bestWid: wid,
})
}
if len(choices) == 0 {
// all windows full
continue
}
// chose randomly
randIndex := rand.Intn(len(choices))
selectedWindow := choices[randIndex].selectedWindow
needRes := choices[randIndex].needRes
info := choices[randIndex].info
bestWid := choices[randIndex].bestWid
log.Debugw("SCHED ASSIGNED",
"assigner", "darts",
"sqi", sqi,
"sector", task.Sector.ID.Number,
"task", task.TaskType,
"window", selectedWindow,
"worker", bestWid,
"choices", len(choices))
windows[selectedWindow].Allocated.Add(task.SealTask(), info.Resources, needRes)
windows[selectedWindow].Todo = append(windows[selectedWindow].Todo, task)
rmQueue = append(rmQueue, sqi)
scheduled++
}
if len(rmQueue) > 0 {
for i := len(rmQueue) - 1; i >= 0; i-- {
sh.SchedQueue.Remove(rmQueue[i])
}
}
return scheduled
}

View File

@ -6,76 +6,84 @@ import (
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
func NewSpreadAssigner() Assigner {
func NewSpreadAssigner(queued bool) Assigner {
return &AssignerCommon{
WindowSel: SpreadWS,
WindowSel: SpreadWS(queued),
}
}
func SpreadWS(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []SchedWindow) int {
scheduled := 0
rmQueue := make([]int, 0, queueLen)
workerAssigned := map[storiface.WorkerID]int{}
func SpreadWS(queued bool) func(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []SchedWindow) int {
return func(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []SchedWindow) int {
scheduled := 0
rmQueue := make([]int, 0, queueLen)
workerAssigned := map[storiface.WorkerID]int{}
for sqi := 0; sqi < queueLen; sqi++ {
task := (*sh.SchedQueue)[sqi]
for sqi := 0; sqi < queueLen; sqi++ {
task := (*sh.SchedQueue)[sqi]
selectedWindow := -1
var needRes storiface.Resources
var info storiface.WorkerInfo
var bestWid storiface.WorkerID
bestAssigned := math.MaxInt // smaller = better
selectedWindow := -1
var needRes storiface.Resources
var info storiface.WorkerInfo
var bestWid storiface.WorkerID
bestAssigned := math.MaxInt // smaller = better
for i, wnd := range acceptableWindows[task.IndexHeap] {
wid := sh.OpenWindows[wnd].Worker
w := sh.Workers[wid]
for i, wnd := range acceptableWindows[task.IndexHeap] {
wid := sh.OpenWindows[wnd].Worker
w := sh.Workers[wid]
res := w.Info.Resources.ResourceSpec(task.Sector.ProofType, task.TaskType)
res := w.Info.Resources.ResourceSpec(task.Sector.ProofType, task.TaskType)
log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i)
log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i)
if !windows[wnd].Allocated.CanHandleRequest(task.SealTask(), res, wid, "schedAssign", w.Info) {
if !windows[wnd].Allocated.CanHandleRequest(task.SealTask(), res, wid, "schedAssign", w.Info) {
continue
}
wu, found := workerAssigned[wid]
if !found && queued {
wu = w.TaskCounts()
workerAssigned[wid] = wu
}
if wu >= bestAssigned {
continue
}
info = w.Info
needRes = res
bestWid = wid
selectedWindow = wnd
bestAssigned = wu
}
if selectedWindow < 0 {
// all windows full
continue
}
wu, _ := workerAssigned[wid]
if wu >= bestAssigned {
continue
log.Debugw("SCHED ASSIGNED",
"assigner", "spread",
"spread-queued", queued,
"sqi", sqi,
"sector", task.Sector.ID.Number,
"task", task.TaskType,
"window", selectedWindow,
"worker", bestWid,
"assigned", bestAssigned)
workerAssigned[bestWid]++
windows[selectedWindow].Allocated.Add(task.SealTask(), info.Resources, needRes)
windows[selectedWindow].Todo = append(windows[selectedWindow].Todo, task)
rmQueue = append(rmQueue, sqi)
scheduled++
}
if len(rmQueue) > 0 {
for i := len(rmQueue) - 1; i >= 0; i-- {
sh.SchedQueue.Remove(rmQueue[i])
}
info = w.Info
needRes = res
bestWid = wid
selectedWindow = wnd
bestAssigned = wu
}
if selectedWindow < 0 {
// all windows full
continue
}
log.Debugw("SCHED ASSIGNED",
"sqi", sqi,
"sector", task.Sector.ID.Number,
"task", task.TaskType,
"window", selectedWindow,
"worker", bestWid,
"assigned", bestAssigned)
workerAssigned[bestWid]++
windows[selectedWindow].Allocated.Add(task.SealTask(), info.Resources, needRes)
windows[selectedWindow].Todo = append(windows[selectedWindow].Todo, task)
rmQueue = append(rmQueue, sqi)
scheduled++
return scheduled
}
if len(rmQueue) > 0 {
for i := len(rmQueue) - 1; i >= 0; i-- {
sh.SchedQueue.Remove(rmQueue[i])
}
}
return scheduled
}

View File

@ -0,0 +1,98 @@
package sealer
import (
"math"
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
func NewSpreadTasksAssigner(queued bool) Assigner {
return &AssignerCommon{
WindowSel: SpreadTasksWS(queued),
}
}
type widTask struct {
wid storiface.WorkerID
tt sealtasks.TaskType
}
func SpreadTasksWS(queued bool) func(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []SchedWindow) int {
return func(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []SchedWindow) int {
scheduled := 0
rmQueue := make([]int, 0, queueLen)
workerAssigned := map[widTask]int{}
for sqi := 0; sqi < queueLen; sqi++ {
task := (*sh.SchedQueue)[sqi]
selectedWindow := -1
var needRes storiface.Resources
var info storiface.WorkerInfo
var bestWid widTask
bestAssigned := math.MaxInt // smaller = better
for i, wnd := range acceptableWindows[task.IndexHeap] {
wid := sh.OpenWindows[wnd].Worker
w := sh.Workers[wid]
res := w.Info.Resources.ResourceSpec(task.Sector.ProofType, task.TaskType)
log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i)
if !windows[wnd].Allocated.CanHandleRequest(task.SealTask(), res, wid, "schedAssign", w.Info) {
continue
}
wt := widTask{wid: wid, tt: task.TaskType}
wu, found := workerAssigned[wt]
if !found && queued {
st := task.SealTask()
wu = w.TaskCount(&st)
workerAssigned[wt] = wu
}
if wu >= bestAssigned {
continue
}
info = w.Info
needRes = res
bestWid = wt
selectedWindow = wnd
bestAssigned = wu
}
if selectedWindow < 0 {
// all windows full
continue
}
log.Debugw("SCHED ASSIGNED",
"assigner", "spread-tasks",
"spread-queued", queued,
"sqi", sqi,
"sector", task.Sector.ID.Number,
"task", task.TaskType,
"window", selectedWindow,
"worker", bestWid,
"assigned", bestAssigned)
workerAssigned[bestWid]++
windows[selectedWindow].Allocated.Add(task.SealTask(), info.Resources, needRes)
windows[selectedWindow].Todo = append(windows[selectedWindow].Todo, task)
rmQueue = append(rmQueue, sqi)
scheduled++
}
if len(rmQueue) > 0 {
for i := len(rmQueue) - 1; i >= 0; i-- {
sh.SchedQueue.Remove(rmQueue[i])
}
}
return scheduled
}
}

View File

@ -74,6 +74,7 @@ func LowestUtilizationWS(sh *Scheduler, queueLen int, acceptableWindows [][]int,
}
log.Debugw("SCHED ASSIGNED",
"assigner", "util",
"sqi", sqi,
"sector", task.Sector.ID.Number,
"task", task.TaskType,

View File

@ -2,12 +2,15 @@ package sealer
import (
"context"
"errors"
"math/rand"
"sync"
"time"
"github.com/hashicorp/go-multierror"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/storage/paths"
@ -102,15 +105,31 @@ func (ps *poStScheduler) Schedule(ctx context.Context, primary bool, spt abi.Reg
}
}()
selected := candidates[0]
worker := ps.workers[selected.id]
var rpcErrs error
return worker.active.withResources(selected.id, worker.Info, ps.postType.SealTask(spt), selected.res, &ps.lk, func() error {
ps.lk.Unlock()
defer ps.lk.Lock()
for i, selected := range candidates {
worker := ps.workers[selected.id]
return work(ctx, worker.workerRpc)
})
err := worker.active.withResources(selected.id, worker.Info, ps.postType.SealTask(spt), selected.res, &ps.lk, func() error {
ps.lk.Unlock()
defer ps.lk.Lock()
return work(ctx, worker.workerRpc)
})
if err == nil {
return nil
}
// if the error is RPCConnectionError, try another worker, if not, return the error
if !errors.As(err, new(*jsonrpc.RPCConnectionError)) {
return err
}
log.Warnw("worker RPC connection error, will retry with another candidate if possible", "error", err, "worker", selected.id, "candidate", i, "candidates", len(candidates))
rpcErrs = multierror.Append(rpcErrs, err)
}
return xerrors.Errorf("got RPC errors from all workers: %w", rpcErrs)
}
type candidateWorker struct {
@ -124,6 +143,11 @@ func (ps *poStScheduler) readyWorkers(spt abi.RegisteredSealProof) (bool, []cand
for wid, wr := range ps.workers {
needRes := wr.Info.Resources.ResourceSpec(spt, ps.postType)
if !wr.Enabled {
log.Debugf("sched: not scheduling on PoSt-worker %s, worker disabled", wid)
continue
}
if !wr.active.CanHandleRequest(ps.postType.SealTask(spt), needRes, wid, "post-readyWorkers", wr.Info) {
continue
}

View File

@ -13,18 +13,68 @@ type ActiveResources struct {
gpuUsed float64
cpuUse uint64
taskCounters map[sealtasks.SealTaskType]int
taskCounters *taskCounter
cond *sync.Cond
waiting int
}
func NewActiveResources() *ActiveResources {
return &ActiveResources{
type taskCounter struct {
taskCounters map[sealtasks.SealTaskType]int
// this lock is technically redundant, as ActiveResources is always accessed
// with the worker lock, but let's not panic if we ever change that
lk sync.Mutex
}
func newTaskCounter() *taskCounter {
return &taskCounter{
taskCounters: map[sealtasks.SealTaskType]int{},
}
}
func (tc *taskCounter) Add(tt sealtasks.SealTaskType) {
tc.lk.Lock()
defer tc.lk.Unlock()
tc.taskCounters[tt]++
}
func (tc *taskCounter) Free(tt sealtasks.SealTaskType) {
tc.lk.Lock()
defer tc.lk.Unlock()
tc.taskCounters[tt]--
}
func (tc *taskCounter) Get(tt sealtasks.SealTaskType) int {
tc.lk.Lock()
defer tc.lk.Unlock()
return tc.taskCounters[tt]
}
func (tc *taskCounter) Sum() int {
tc.lk.Lock()
defer tc.lk.Unlock()
sum := 0
for _, v := range tc.taskCounters {
sum += v
}
return sum
}
func (tc *taskCounter) ForEach(cb func(tt sealtasks.SealTaskType, count int)) {
tc.lk.Lock()
defer tc.lk.Unlock()
for tt, count := range tc.taskCounters {
cb(tt, count)
}
}
func NewActiveResources(tc *taskCounter) *ActiveResources {
return &ActiveResources{
taskCounters: tc,
}
}
func (a *ActiveResources) withResources(id storiface.WorkerID, wr storiface.WorkerInfo, tt sealtasks.SealTaskType, r storiface.Resources, locker sync.Locker, cb func() error) error {
for !a.CanHandleRequest(tt, r, id, "withResources", wr) {
if a.cond == nil {
@ -59,7 +109,7 @@ func (a *ActiveResources) Add(tt sealtasks.SealTaskType, wr storiface.WorkerReso
a.cpuUse += r.Threads(wr.CPUs, len(wr.GPUs))
a.memUsedMin += r.MinMemory
a.memUsedMax += r.MaxMemory
a.taskCounters[tt]++
a.taskCounters.Add(tt)
return a.utilization(wr) - startUtil
}
@ -71,7 +121,7 @@ func (a *ActiveResources) Free(tt sealtasks.SealTaskType, wr storiface.WorkerRes
a.cpuUse -= r.Threads(wr.CPUs, len(wr.GPUs))
a.memUsedMin -= r.MinMemory
a.memUsedMax -= r.MaxMemory
a.taskCounters[tt]--
a.taskCounters.Free(tt)
if a.cond != nil {
a.cond.Broadcast()
@ -82,8 +132,8 @@ func (a *ActiveResources) Free(tt sealtasks.SealTaskType, wr storiface.WorkerRes
// handle the request.
func (a *ActiveResources) CanHandleRequest(tt sealtasks.SealTaskType, needRes storiface.Resources, wid storiface.WorkerID, caller string, info storiface.WorkerInfo) bool {
if needRes.MaxConcurrent > 0 {
if a.taskCounters[tt] >= needRes.MaxConcurrent {
log.Debugf("sched: not scheduling on worker %s for %s; at task limit tt=%s, curcount=%d", wid, caller, tt, a.taskCounters[tt])
if a.taskCounters.Get(tt) >= needRes.MaxConcurrent {
log.Debugf("sched: not scheduling on worker %s for %s; at task limit tt=%s, curcount=%d", wid, caller, tt, a.taskCounters.Get(tt))
return false
}
}
@ -170,6 +220,15 @@ func (a *ActiveResources) utilization(wr storiface.WorkerResources) float64 { //
return max
}
func (a *ActiveResources) taskCount(tt *sealtasks.SealTaskType) int {
// nil means all tasks
if tt == nil {
return a.taskCounters.Sum()
}
return a.taskCounters.Get(*tt)
}
func (wh *WorkerHandle) Utilization() float64 {
wh.lk.Lock()
u := wh.active.utilization(wh.Info.Resources)
@ -183,3 +242,31 @@ func (wh *WorkerHandle) Utilization() float64 {
return u
}
func (wh *WorkerHandle) TaskCounts() int {
wh.lk.Lock()
u := wh.active.taskCount(nil)
u += wh.preparing.taskCount(nil)
wh.lk.Unlock()
wh.wndLk.Lock()
for _, window := range wh.activeWindows {
u += window.Allocated.taskCount(nil)
}
wh.wndLk.Unlock()
return u
}
func (wh *WorkerHandle) TaskCount(tt *sealtasks.SealTaskType) int {
wh.lk.Lock()
u := wh.active.taskCount(tt)
u += wh.preparing.taskCount(tt)
wh.lk.Unlock()
wh.wndLk.Lock()
for _, window := range wh.activeWindows {
u += window.Allocated.taskCount(tt)
}
wh.wndLk.Unlock()
return u
}

View File

@ -288,25 +288,30 @@ func TestSched(t *testing.T) {
ProofType: spt,
}
err := sched.Schedule(ctx, sectorRef, taskType, sel, func(ctx context.Context, w Worker) error {
wi, err := w.Info(ctx)
require.NoError(t, err)
prep := PrepareAction{
Action: func(ctx context.Context, w Worker) error {
wi, err := w.Info(ctx)
require.NoError(t, err)
require.Equal(t, expectWorker, wi.Hostname)
require.Equal(t, expectWorker, wi.Hostname)
log.Info("IN ", taskName)
log.Info("IN ", taskName)
for {
_, ok := <-done
if !ok {
break
for {
_, ok := <-done
if !ok {
break
}
}
}
log.Info("OUT ", taskName)
log.Info("OUT ", taskName)
return nil
}, noopAction)
return nil
},
PrepType: taskType,
}
err := sched.Schedule(ctx, sectorRef, taskType, sel, prep, noopAction)
if err != context.Canceled {
require.NoError(t, err, fmt.Sprint(l, l2))
}
@ -639,8 +644,8 @@ func BenchmarkTrySched(b *testing.B) {
Resources: decentWorkerResources,
},
Enabled: true,
preparing: NewActiveResources(),
active: NewActiveResources(),
preparing: NewActiveResources(newTaskCounter()),
active: NewActiveResources(newTaskCounter()),
}
for i := 0; i < windows; i++ {
@ -685,7 +690,7 @@ func TestWindowCompact(t *testing.T) {
for _, windowTasks := range start {
window := &SchedWindow{
Allocated: *NewActiveResources(),
Allocated: *NewActiveResources(newTaskCounter()),
}
for _, task := range windowTasks {
@ -708,7 +713,7 @@ func TestWindowCompact(t *testing.T) {
require.Equal(t, len(start)-len(expect), -sw.windowsRequested)
for wi, tasks := range expect {
expectRes := NewActiveResources()
expectRes := NewActiveResources(newTaskCounter())
for ti, task := range tasks {
require.Equal(t, task, wh.activeWindows[wi].Todo[ti].TaskType, "%d, %d", wi, ti)

View File

@ -30,12 +30,14 @@ func newWorkerHandle(ctx context.Context, w Worker) (*WorkerHandle, error) {
return nil, xerrors.Errorf("getting worker info: %w", err)
}
tc := newTaskCounter()
worker := &WorkerHandle{
workerRpc: w,
Info: info,
preparing: NewActiveResources(),
active: NewActiveResources(),
preparing: NewActiveResources(tc),
active: NewActiveResources(tc),
Enabled: true,
closingMgr: make(chan struct{}),
@ -352,8 +354,8 @@ assignLoop:
worker.lk.Lock()
for t, todo := range firstWindow.Todo {
needRes := worker.Info.Resources.ResourceSpec(todo.Sector.ProofType, todo.TaskType)
if worker.preparing.CanHandleRequest(todo.SealTask(), needRes, sw.wid, "startPreparing", worker.Info) {
needResPrep := worker.Info.Resources.PrepResourceSpec(todo.Sector.ProofType, todo.TaskType, todo.prepare.PrepType)
if worker.preparing.CanHandleRequest(todo.PrepSealTask(), needResPrep, sw.wid, "startPreparing", worker.Info) {
tidx = t
break
}
@ -452,20 +454,21 @@ func (sw *schedWorker) startProcessingTask(req *WorkerRequest) error {
w, sh := sw.worker, sw.sched
needRes := w.Info.Resources.ResourceSpec(req.Sector.ProofType, req.TaskType)
needResPrep := w.Info.Resources.PrepResourceSpec(req.Sector.ProofType, req.TaskType, req.prepare.PrepType)
w.lk.Lock()
w.preparing.Add(req.SealTask(), w.Info.Resources, needRes)
w.preparing.Add(req.PrepSealTask(), w.Info.Resources, needResPrep)
w.lk.Unlock()
go func() {
// first run the prepare step (e.g. fetching sector data from other worker)
tw := sh.workTracker.worker(sw.wid, w.Info, w.workerRpc)
tw.start()
err := req.prepare(req.Ctx, tw)
err := req.prepare.Action(req.Ctx, tw)
w.lk.Lock()
if err != nil {
w.preparing.Free(req.SealTask(), w.Info.Resources, needRes)
w.preparing.Free(req.PrepSealTask(), w.Info.Resources, needResPrep)
w.lk.Unlock()
select {
@ -495,7 +498,7 @@ func (sw *schedWorker) startProcessingTask(req *WorkerRequest) error {
// wait (if needed) for resources in the 'active' window
err = w.active.withResources(sw.wid, w.Info, req.SealTask(), needRes, &w.lk, func() error {
w.preparing.Free(req.SealTask(), w.Info.Resources, needRes)
w.preparing.Free(req.PrepSealTask(), w.Info.Resources, needResPrep)
w.lk.Unlock()
defer w.lk.Lock() // we MUST return locked from this function

View File

@ -36,6 +36,8 @@ const (
TTGenerateWindowPoSt TaskType = "post/v0/windowproof"
TTGenerateWinningPoSt TaskType = "post/v0/winningproof"
TTNoop TaskType = ""
)
var order = map[TaskType]int{

View File

@ -43,9 +43,9 @@ func (m *Manager) WorkerStats(ctx context.Context) map[uuid.UUID]storiface.Worke
TaskCounts: map[string]int{},
}
for tt, count := range handle.active.taskCounters {
handle.active.taskCounters.ForEach(func(tt sealtasks.SealTaskType, count int) {
out[uuid.UUID(id)].TaskCounts[tt.String()] = count
}
})
handle.lk.Unlock()
}

View File

@ -65,6 +65,20 @@ func (wr WorkerResources) ResourceSpec(spt abi.RegisteredSealProof, tt sealtasks
return res
}
// PrepResourceSpec is like ResourceSpec, but meant for use limiting parallel preparing
// tasks.
func (wr WorkerResources) PrepResourceSpec(spt abi.RegisteredSealProof, tt, prepTT sealtasks.TaskType) Resources {
res := wr.ResourceSpec(spt, tt)
if prepTT != tt && prepTT != sealtasks.TTNoop {
prepRes := wr.ResourceSpec(spt, prepTT)
res.MaxConcurrent = prepRes.MaxConcurrent
}
// otherwise, use the default resource table
return res
}
type WorkerStats struct {
Info WorkerInfo
Tasks []sealtasks.TaskType