Merge pull request #9231 from filecoin-project/fix/datacid-res-envvars
sealing: fix: Make DataCid resource env vars make more sense
This commit is contained in:
commit
088bf56f2a
@ -519,6 +519,7 @@ func (s *SplitStore) applyProtectors() error {
|
||||
// - At this point we are ready to begin purging:
|
||||
// - We sort cold objects heaviest first, so as to never delete the consituents of a DAG before the DAG itself (which would leave dangling references)
|
||||
// - We delete in small batches taking a lock; each batch is checked again for marks, from the concurrent transactional mark, so as to never delete anything live
|
||||
//
|
||||
// - We then end the transaction and compact/gc the hotstore.
|
||||
func (s *SplitStore) compact(curTs *types.TipSet) {
|
||||
log.Info("waiting for active views to complete")
|
||||
|
@ -12,10 +12,9 @@ type unionBlockstore []Blockstore
|
||||
|
||||
// Union returns an unioned blockstore.
|
||||
//
|
||||
// * Reads return from the first blockstore that has the value, querying in the
|
||||
// - Reads return from the first blockstore that has the value, querying in the
|
||||
// supplied order.
|
||||
// * Writes (puts and deletes) are broadcast to all stores.
|
||||
//
|
||||
// - Writes (puts and deletes) are broadcast to all stores.
|
||||
func Union(stores ...Blockstore) Blockstore {
|
||||
return unionBlockstore(stores)
|
||||
}
|
||||
|
Binary file not shown.
Binary file not shown.
@ -5,7 +5,6 @@
|
||||
//
|
||||
// Its purpose is to unlock various degrees of flexibility and parametrization
|
||||
// when writing Testground plans for Lotus.
|
||||
//
|
||||
package build
|
||||
|
||||
import (
|
||||
|
@ -23,6 +23,7 @@ type triggerID = uint64
|
||||
type msgH = abi.ChainEpoch
|
||||
|
||||
// triggerH is the block height at which the listener will be notified about the
|
||||
//
|
||||
// message (msgH+confidence)
|
||||
type triggerH = abi.ChainEpoch
|
||||
|
||||
@ -39,6 +40,7 @@ type EventHandler func(ctx context.Context, data eventData, prevTs, ts *types.Ti
|
||||
//
|
||||
// If `done` is true, timeout won't be triggered
|
||||
// If `more` is false, no messages will be sent to EventHandler (RevertHandler
|
||||
//
|
||||
// may still be called)
|
||||
type CheckFunc func(ctx context.Context, ts *types.TipSet) (done bool, more bool, err error)
|
||||
|
||||
@ -375,29 +377,29 @@ type StateMatchFunc func(oldTs, newTs *types.TipSet) (bool, StateChange, error)
|
||||
// StateChanged registers a callback which is triggered when a specified state
|
||||
// change occurs or a timeout is reached.
|
||||
//
|
||||
// * `CheckFunc` callback is invoked immediately with a recent tipset, it
|
||||
// - `CheckFunc` callback is invoked immediately with a recent tipset, it
|
||||
// returns two booleans - `done`, and `more`.
|
||||
//
|
||||
// * `done` should be true when some on-chain state change we are waiting
|
||||
// - `done` should be true when some on-chain state change we are waiting
|
||||
// for has happened. When `done` is set to true, timeout trigger is disabled.
|
||||
//
|
||||
// * `more` should be false when we don't want to receive new notifications
|
||||
// - `more` should be false when we don't want to receive new notifications
|
||||
// through StateChangeHandler. Note that notifications may still be delivered to
|
||||
// RevertHandler
|
||||
//
|
||||
// * `StateChangeHandler` is called when the specified state change was observed
|
||||
// - `StateChangeHandler` is called when the specified state change was observed
|
||||
// on-chain, and a confidence threshold was reached, or the specified `timeout`
|
||||
// height was reached with no state change observed. When this callback is
|
||||
// invoked on a timeout, `oldTs` and `states are set to nil.
|
||||
// This callback returns a boolean specifying whether further notifications
|
||||
// should be sent, like `more` return param from `CheckFunc` above.
|
||||
//
|
||||
// * `RevertHandler` is called after apply handler, when we drop the tipset
|
||||
// - `RevertHandler` is called after apply handler, when we drop the tipset
|
||||
// containing the message. The tipset passed as the argument is the tipset
|
||||
// that is being dropped. Note that the event dropped may be re-applied
|
||||
// in a different tipset in small amount of time.
|
||||
//
|
||||
// * `StateMatchFunc` is called against each tipset state. If there is a match,
|
||||
// - `StateMatchFunc` is called against each tipset state. If there is a match,
|
||||
// the state change is queued up until the confidence interval has elapsed (and
|
||||
// `StateChangeHandler` is called)
|
||||
func (we *watcherEvents) StateChanged(check CheckFunc, scHnd StateChangeHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, mf StateMatchFunc) error {
|
||||
@ -503,31 +505,32 @@ type MsgHandler func(msg *types.Message, rec *types.MessageReceipt, ts *types.Ti
|
||||
type MsgMatchFunc func(msg *types.Message) (matched bool, err error)
|
||||
|
||||
// Called registers a callback which is triggered when a specified method is
|
||||
//
|
||||
// called on an actor, or a timeout is reached.
|
||||
//
|
||||
// * `CheckFunc` callback is invoked immediately with a recent tipset, it
|
||||
// - `CheckFunc` callback is invoked immediately with a recent tipset, it
|
||||
// returns two booleans - `done`, and `more`.
|
||||
//
|
||||
// * `done` should be true when some on-chain action we are waiting for has
|
||||
// - `done` should be true when some on-chain action we are waiting for has
|
||||
// happened. When `done` is set to true, timeout trigger is disabled.
|
||||
//
|
||||
// * `more` should be false when we don't want to receive new notifications
|
||||
// - `more` should be false when we don't want to receive new notifications
|
||||
// through MsgHandler. Note that notifications may still be delivered to
|
||||
// RevertHandler
|
||||
//
|
||||
// * `MsgHandler` is called when the specified event was observed on-chain,
|
||||
// - `MsgHandler` is called when the specified event was observed on-chain,
|
||||
// and a confidence threshold was reached, or the specified `timeout` height
|
||||
// was reached with no events observed. When this callback is invoked on a
|
||||
// timeout, `msg` is set to nil. This callback returns a boolean specifying
|
||||
// whether further notifications should be sent, like `more` return param
|
||||
// from `CheckFunc` above.
|
||||
//
|
||||
// * `RevertHandler` is called after apply handler, when we drop the tipset
|
||||
// - `RevertHandler` is called after apply handler, when we drop the tipset
|
||||
// containing the message. The tipset passed as the argument is the tipset
|
||||
// that is being dropped. Note that the message dropped may be re-applied
|
||||
// in a different tipset in small amount of time.
|
||||
//
|
||||
// * `MsgMatchFunc` is called against each message. If there is a match, the
|
||||
// - `MsgMatchFunc` is called against each message. If there is a match, the
|
||||
// message is queued up until the confidence interval has elapsed (and
|
||||
// `MsgHandler` is called)
|
||||
func (me *messageEvents) Called(ctx context.Context, check CheckFunc, msgHnd MsgHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, mf MsgMatchFunc) error {
|
||||
|
@ -21,6 +21,7 @@ const (
|
||||
)
|
||||
|
||||
// FIXME: Bumped from original 800 to this to accommodate `syncFork()`
|
||||
//
|
||||
// use of `GetBlocks()`. It seems the expectation of that API is to
|
||||
// fetch any amount of blocks leaving it to the internal logic here
|
||||
// to partition and reassemble the requests if they go above the maximum.
|
||||
@ -147,10 +148,11 @@ type BSTipSet struct {
|
||||
// `BlsIncludes`/`SecpkIncludes` matches `Bls`/`Secpk` messages
|
||||
// to blocks in the tipsets with the format:
|
||||
// `BlsIncludes[BI][MI]`
|
||||
// * BI: block index in the tipset.
|
||||
// * MI: message index in `Bls` list
|
||||
// - BI: block index in the tipset.
|
||||
// - MI: message index in `Bls` list
|
||||
//
|
||||
// FIXME: The logic to decompress this structure should belong
|
||||
//
|
||||
// to itself, not to the consumer.
|
||||
type CompactedMessages struct {
|
||||
Bls []*types.Message
|
||||
|
@ -9,10 +9,15 @@ import (
|
||||
|
||||
// WrapHeadChangeCoalescer wraps a ReorgNotifee with a head change coalescer.
|
||||
// minDelay is the minimum coalesce delay; when a head change is first received, the coalescer will
|
||||
//
|
||||
// wait for that long to coalesce more head changes.
|
||||
//
|
||||
// maxDelay is the maximum coalesce delay; the coalescer will not delay delivery of a head change
|
||||
//
|
||||
// more than that.
|
||||
//
|
||||
// mergeInterval is the interval that triggers additional coalesce delay; if the last head change was
|
||||
//
|
||||
// within the merge interval when the coalesce timer fires, then the coalesce time is extended
|
||||
// by min delay and up to max delay total.
|
||||
func WrapHeadChangeCoalescer(fn ReorgNotifee, minDelay, maxDelay, mergeInterval time.Duration) ReorgNotifee {
|
||||
|
@ -453,6 +453,7 @@ func (cs *ChainStore) MaybeTakeHeavierTipSet(ctx context.Context, ts *types.TipS
|
||||
// The "fast forward" case is covered in this logic as a valid fork of length 0.
|
||||
//
|
||||
// FIXME: We may want to replace some of the logic in `syncFork()` with this.
|
||||
//
|
||||
// `syncFork()` counts the length on both sides of the fork at the moment (we
|
||||
// need to settle on that) but here we just enforce it on the `synced` side.
|
||||
func (cs *ChainStore) exceedsForkLength(ctx context.Context, synced, external *types.TipSet) (bool, error) {
|
||||
|
@ -159,8 +159,11 @@ func FetchSignedMessagesByCids(
|
||||
}
|
||||
|
||||
// Fetch `cids` from the block service, apply `cb` on each of them. Used
|
||||
//
|
||||
// by the fetch message functions above.
|
||||
//
|
||||
// We check that each block is received only once and we do not received
|
||||
//
|
||||
// blocks we did not request.
|
||||
func fetchCids(
|
||||
ctx context.Context,
|
||||
|
@ -60,14 +60,14 @@ var (
|
||||
// Syncer is in charge of running the chain synchronization logic. As such, it
|
||||
// is tasked with these functions, amongst others:
|
||||
//
|
||||
// * Fast-forwards the chain as it learns of new TipSets from the network via
|
||||
// - Fast-forwards the chain as it learns of new TipSets from the network via
|
||||
// the SyncManager.
|
||||
// * Applies the fork choice rule to select the correct side when confronted
|
||||
// - Applies the fork choice rule to select the correct side when confronted
|
||||
// with a fork in the network.
|
||||
// * Requests block headers and messages from other peers when not available
|
||||
// - Requests block headers and messages from other peers when not available
|
||||
// in our BlockStore.
|
||||
// * Tracks blocks marked as bad in a cache.
|
||||
// * Keeps the BlockStore and ChainStore consistent with our view of the world,
|
||||
// - Tracks blocks marked as bad in a cache.
|
||||
// - Keeps the BlockStore and ChainStore consistent with our view of the world,
|
||||
// the latter of which in turn informs other components when a reorg has been
|
||||
// committed.
|
||||
//
|
||||
|
@ -99,11 +99,11 @@ func tipsetSortFunc(blks []*BlockHeader) func(i, j int) bool {
|
||||
}
|
||||
|
||||
// Checks:
|
||||
// * A tipset is composed of at least one block. (Because of our variable
|
||||
// - A tipset is composed of at least one block. (Because of our variable
|
||||
// number of blocks per tipset, determined by randomness, we do not impose
|
||||
// an upper limit.)
|
||||
// * All blocks have the same height.
|
||||
// * All blocks have the same parents (same number of them and matching CIDs).
|
||||
// - All blocks have the same height.
|
||||
// - All blocks have the same parents (same number of them and matching CIDs).
|
||||
func NewTipSet(blks []*BlockHeader) (*TipSet, error) {
|
||||
if len(blks) == 0 {
|
||||
return nil, xerrors.Errorf("NewTipSet called with zero length array of blocks")
|
||||
|
@ -276,6 +276,7 @@ func (d *Driver) ExecuteMessage(bs blockstore.Blockstore, params ExecuteMessageP
|
||||
// messages that originate from secp256k senders, leaving all
|
||||
// others untouched.
|
||||
// TODO: generate a signature in the DSL so that it's encoded in
|
||||
//
|
||||
// the test vector.
|
||||
func toChainMsg(msg *types.Message) (ret types.ChainMsg) {
|
||||
ret = msg
|
||||
|
@ -4397,26 +4397,26 @@ Response:
|
||||
},
|
||||
"seal/v0/datacid": {
|
||||
"0": {
|
||||
"MinMemory": 2048,
|
||||
"MaxMemory": 2048,
|
||||
"MinMemory": 4294967296,
|
||||
"MaxMemory": 4294967296,
|
||||
"GPUUtilization": 0,
|
||||
"MaxParallelism": 1,
|
||||
"MaxParallelismGPU": 0,
|
||||
"BaseMinMemory": 2048,
|
||||
"BaseMinMemory": 1073741824,
|
||||
"MaxConcurrent": 0
|
||||
},
|
||||
"1": {
|
||||
"MinMemory": 8388608,
|
||||
"MaxMemory": 8388608,
|
||||
"MinMemory": 4294967296,
|
||||
"MaxMemory": 4294967296,
|
||||
"GPUUtilization": 0,
|
||||
"MaxParallelism": 1,
|
||||
"MaxParallelismGPU": 0,
|
||||
"BaseMinMemory": 8388608,
|
||||
"BaseMinMemory": 1073741824,
|
||||
"MaxConcurrent": 0
|
||||
},
|
||||
"2": {
|
||||
"MinMemory": 1073741824,
|
||||
"MaxMemory": 1073741824,
|
||||
"MinMemory": 4294967296,
|
||||
"MaxMemory": 4294967296,
|
||||
"GPUUtilization": 0,
|
||||
"MaxParallelism": 1,
|
||||
"MaxParallelismGPU": 0,
|
||||
@ -4433,8 +4433,8 @@ Response:
|
||||
"MaxConcurrent": 0
|
||||
},
|
||||
"4": {
|
||||
"MinMemory": 8589934592,
|
||||
"MaxMemory": 8589934592,
|
||||
"MinMemory": 4294967296,
|
||||
"MaxMemory": 4294967296,
|
||||
"GPUUtilization": 0,
|
||||
"MaxParallelism": 1,
|
||||
"MaxParallelismGPU": 0,
|
||||
@ -4442,26 +4442,26 @@ Response:
|
||||
"MaxConcurrent": 0
|
||||
},
|
||||
"5": {
|
||||
"MinMemory": 2048,
|
||||
"MaxMemory": 2048,
|
||||
"MinMemory": 4294967296,
|
||||
"MaxMemory": 4294967296,
|
||||
"GPUUtilization": 0,
|
||||
"MaxParallelism": 1,
|
||||
"MaxParallelismGPU": 0,
|
||||
"BaseMinMemory": 2048,
|
||||
"BaseMinMemory": 1073741824,
|
||||
"MaxConcurrent": 0
|
||||
},
|
||||
"6": {
|
||||
"MinMemory": 8388608,
|
||||
"MaxMemory": 8388608,
|
||||
"MinMemory": 4294967296,
|
||||
"MaxMemory": 4294967296,
|
||||
"GPUUtilization": 0,
|
||||
"MaxParallelism": 1,
|
||||
"MaxParallelismGPU": 0,
|
||||
"BaseMinMemory": 8388608,
|
||||
"BaseMinMemory": 1073741824,
|
||||
"MaxConcurrent": 0
|
||||
},
|
||||
"7": {
|
||||
"MinMemory": 1073741824,
|
||||
"MaxMemory": 1073741824,
|
||||
"MinMemory": 4294967296,
|
||||
"MaxMemory": 4294967296,
|
||||
"GPUUtilization": 0,
|
||||
"MaxParallelism": 1,
|
||||
"MaxParallelismGPU": 0,
|
||||
@ -4478,8 +4478,8 @@ Response:
|
||||
"MaxConcurrent": 0
|
||||
},
|
||||
"9": {
|
||||
"MinMemory": 8589934592,
|
||||
"MaxMemory": 8589934592,
|
||||
"MinMemory": 4294967296,
|
||||
"MaxMemory": 4294967296,
|
||||
"GPUUtilization": 0,
|
||||
"MaxParallelism": 1,
|
||||
"MaxParallelismGPU": 0,
|
||||
|
@ -579,26 +579,26 @@ Response:
|
||||
},
|
||||
"seal/v0/datacid": {
|
||||
"0": {
|
||||
"MinMemory": 2048,
|
||||
"MaxMemory": 2048,
|
||||
"MinMemory": 4294967296,
|
||||
"MaxMemory": 4294967296,
|
||||
"GPUUtilization": 0,
|
||||
"MaxParallelism": 1,
|
||||
"MaxParallelismGPU": 0,
|
||||
"BaseMinMemory": 2048,
|
||||
"BaseMinMemory": 1073741824,
|
||||
"MaxConcurrent": 0
|
||||
},
|
||||
"1": {
|
||||
"MinMemory": 8388608,
|
||||
"MaxMemory": 8388608,
|
||||
"MinMemory": 4294967296,
|
||||
"MaxMemory": 4294967296,
|
||||
"GPUUtilization": 0,
|
||||
"MaxParallelism": 1,
|
||||
"MaxParallelismGPU": 0,
|
||||
"BaseMinMemory": 8388608,
|
||||
"BaseMinMemory": 1073741824,
|
||||
"MaxConcurrent": 0
|
||||
},
|
||||
"2": {
|
||||
"MinMemory": 1073741824,
|
||||
"MaxMemory": 1073741824,
|
||||
"MinMemory": 4294967296,
|
||||
"MaxMemory": 4294967296,
|
||||
"GPUUtilization": 0,
|
||||
"MaxParallelism": 1,
|
||||
"MaxParallelismGPU": 0,
|
||||
@ -615,8 +615,8 @@ Response:
|
||||
"MaxConcurrent": 0
|
||||
},
|
||||
"4": {
|
||||
"MinMemory": 8589934592,
|
||||
"MaxMemory": 8589934592,
|
||||
"MinMemory": 4294967296,
|
||||
"MaxMemory": 4294967296,
|
||||
"GPUUtilization": 0,
|
||||
"MaxParallelism": 1,
|
||||
"MaxParallelismGPU": 0,
|
||||
@ -624,26 +624,26 @@ Response:
|
||||
"MaxConcurrent": 0
|
||||
},
|
||||
"5": {
|
||||
"MinMemory": 2048,
|
||||
"MaxMemory": 2048,
|
||||
"MinMemory": 4294967296,
|
||||
"MaxMemory": 4294967296,
|
||||
"GPUUtilization": 0,
|
||||
"MaxParallelism": 1,
|
||||
"MaxParallelismGPU": 0,
|
||||
"BaseMinMemory": 2048,
|
||||
"BaseMinMemory": 1073741824,
|
||||
"MaxConcurrent": 0
|
||||
},
|
||||
"6": {
|
||||
"MinMemory": 8388608,
|
||||
"MaxMemory": 8388608,
|
||||
"MinMemory": 4294967296,
|
||||
"MaxMemory": 4294967296,
|
||||
"GPUUtilization": 0,
|
||||
"MaxParallelism": 1,
|
||||
"MaxParallelismGPU": 0,
|
||||
"BaseMinMemory": 8388608,
|
||||
"BaseMinMemory": 1073741824,
|
||||
"MaxConcurrent": 0
|
||||
},
|
||||
"7": {
|
||||
"MinMemory": 1073741824,
|
||||
"MaxMemory": 1073741824,
|
||||
"MinMemory": 4294967296,
|
||||
"MaxMemory": 4294967296,
|
||||
"GPUUtilization": 0,
|
||||
"MaxParallelism": 1,
|
||||
"MaxParallelismGPU": 0,
|
||||
@ -660,8 +660,8 @@ Response:
|
||||
"MaxConcurrent": 0
|
||||
},
|
||||
"9": {
|
||||
"MinMemory": 8589934592,
|
||||
"MaxMemory": 8589934592,
|
||||
"MinMemory": 4294967296,
|
||||
"MaxMemory": 4294967296,
|
||||
"GPUUtilization": 0,
|
||||
"MaxParallelism": 1,
|
||||
"MaxParallelismGPU": 0,
|
||||
|
@ -108,7 +108,6 @@ func init() {
|
||||
// kit.EnsembleMinimal()
|
||||
// kit.EnsembleOneTwo()
|
||||
// kit.EnsembleTwoOne()
|
||||
//
|
||||
type Ensemble struct {
|
||||
t *testing.T
|
||||
bootstrapped bool
|
||||
|
@ -39,6 +39,7 @@ type ResolveOnce func(ctx context.Context, ds ipld.NodeGetter, nd ipld.Node, nam
|
||||
// Resolver provides path resolution to IPFS
|
||||
// It has a pointer to a DAGService, which is uses to resolve nodes.
|
||||
// TODO: now that this is more modular, try to unify this code with the
|
||||
//
|
||||
// the resolvers in namesys
|
||||
type Resolver struct {
|
||||
DAG ipld.NodeGetter
|
||||
|
@ -27,10 +27,10 @@ func (e *pathError) Path() string {
|
||||
}
|
||||
|
||||
// A Path represents an ipfs content path:
|
||||
// * /<cid>/path/to/file
|
||||
// * /ipfs/<cid>
|
||||
// * /ipns/<cid>/path/to/folder
|
||||
// * etc
|
||||
// - /<cid>/path/to/file
|
||||
// - /ipfs/<cid>
|
||||
// - /ipns/<cid>/path/to/folder
|
||||
// - etc
|
||||
type Path string
|
||||
|
||||
// ^^^
|
||||
|
@ -35,6 +35,7 @@ func NewLineCol(name string) Column {
|
||||
}
|
||||
|
||||
// Unlike text/tabwriter, this works with CLI escape codes, and allows for info
|
||||
//
|
||||
// in separate lines
|
||||
func New(cols ...Column) *TableWriter {
|
||||
return &TableWriter{
|
||||
|
@ -49,6 +49,7 @@ import (
|
||||
var log = logging.Logger("builder")
|
||||
|
||||
// special is a type used to give keys to modules which
|
||||
//
|
||||
// can't really be identified by the returned type
|
||||
type special struct{ id int }
|
||||
|
||||
@ -73,6 +74,7 @@ var (
|
||||
type invoke int
|
||||
|
||||
// Invokes are called in the order they are defined.
|
||||
//
|
||||
//nolint:golint
|
||||
const (
|
||||
// InitJournal at position 0 initializes the journal global var as soon as
|
||||
|
@ -18,6 +18,7 @@ import (
|
||||
)
|
||||
|
||||
// TODO: For now we handle this by halting state execution, when we get jsonrpc reconnecting
|
||||
//
|
||||
// We should implement some wait-for-api logic
|
||||
type ErrApi struct{ error }
|
||||
|
||||
@ -91,6 +92,7 @@ func checkPieces(ctx context.Context, maddr address.Address, si SectorInfo, api
|
||||
}
|
||||
|
||||
// checkPrecommit checks that data commitment generated in the sealing process
|
||||
//
|
||||
// matches pieces, and that the seal ticket isn't expired
|
||||
func checkPrecommit(ctx context.Context, maddr address.Address, si SectorInfo, tsk types.TipSetKey, height abi.ChainEpoch, api SealingAPI) (err error) {
|
||||
if err := checkPieces(ctx, maddr, si, api, false); err != nil {
|
||||
|
@ -261,7 +261,6 @@ func getGrothParamFileAndVerifyingKeys(s abi.SectorSize) {
|
||||
// those parameters and keys. To do this, run the following command:
|
||||
//
|
||||
// go test -run=^TestDownloadParams
|
||||
//
|
||||
func TestDownloadParams(t *testing.T) {
|
||||
// defer requireFDsClosed(t, openFDs(t)) flaky likely cause of how go-embed works with param files
|
||||
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
)
|
||||
|
||||
// merge gaps between ranges which are close to each other
|
||||
//
|
||||
// TODO: more benchmarking to come up with more optimal number
|
||||
const mergeGaps = 32 << 20
|
||||
|
||||
|
@ -96,6 +96,11 @@ func (a TaskType) WorkerType() string {
|
||||
}
|
||||
}
|
||||
|
||||
// SectorSized returns true if the task operates on a specific sector size
|
||||
func (a TaskType) SectorSized() bool {
|
||||
return a != TTDataCid
|
||||
}
|
||||
|
||||
func (a TaskType) MuchLess(b TaskType) (bool, bool) {
|
||||
oa, ob := order[a], order[b]
|
||||
oneNegative := oa^ob < 0
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
)
|
||||
|
||||
// ID identifies sector storage by UUID. One sector storage should map to one
|
||||
//
|
||||
// filesystem, local or networked / shared by multiple machines
|
||||
type ID string
|
||||
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
@ -13,6 +14,8 @@ import (
|
||||
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
|
||||
)
|
||||
|
||||
var log = logging.Logger("resources")
|
||||
|
||||
type Resources struct {
|
||||
MinMemory uint64 `envname:"MIN_MEMORY"` // What Must be in RAM for decent perf
|
||||
MaxMemory uint64 `envname:"MAX_MEMORY"` // Memory required (swap + ram; peak memory usage during task execution)
|
||||
@ -32,7 +35,6 @@ type Resources struct {
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
Percent of threads to allocate to parallel tasks
|
||||
|
||||
12 * 0.92 = 11
|
||||
@ -41,7 +43,6 @@ type Resources struct {
|
||||
32 * 0.92 = 29
|
||||
64 * 0.92 = 58
|
||||
128 * 0.92 = 117
|
||||
|
||||
*/
|
||||
var ParallelNum uint64 = 92
|
||||
var ParallelDenom uint64 = 100
|
||||
@ -572,7 +573,12 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
|
||||
func init() {
|
||||
ResourceTable[sealtasks.TTUnseal] = ResourceTable[sealtasks.TTPreCommit1] // TODO: measure accurately
|
||||
ResourceTable[sealtasks.TTRegenSectorKey] = ResourceTable[sealtasks.TTReplicaUpdate]
|
||||
ResourceTable[sealtasks.TTDataCid] = ResourceTable[sealtasks.TTAddPiece]
|
||||
|
||||
// DataCid doesn't care about sector proof type; Use 32G AddPiece resource definition
|
||||
ResourceTable[sealtasks.TTDataCid] = map[abi.RegisteredSealProof]Resources{}
|
||||
for proof := range ResourceTable[sealtasks.TTAddPiece] {
|
||||
ResourceTable[sealtasks.TTDataCid][proof] = ResourceTable[sealtasks.TTAddPiece][abi.RegisteredSealProof_StackedDrg32GiBV1]
|
||||
}
|
||||
|
||||
// V1_1 is the same as V1
|
||||
for _, m := range ResourceTable {
|
||||
@ -609,6 +615,9 @@ func ParseResourceEnv(lookup func(key, def string) (string, bool)) (map[sealtask
|
||||
}
|
||||
|
||||
envval, found := lookup(taskType.Short()+"_"+shortSize+"_"+envname, fmt.Sprint(rr.Elem().Field(i).Interface()))
|
||||
if !found {
|
||||
// see if a non-size-specific envvar is set
|
||||
envval, found = lookup(taskType.Short()+"_"+envname, fmt.Sprint(rr.Elem().Field(i).Interface()))
|
||||
if !found {
|
||||
// special multicore SDR handling
|
||||
if (taskType == sealtasks.TTPreCommit1 || taskType == sealtasks.TTUnseal) && envname == "MAX_PARALLELISM" {
|
||||
@ -626,6 +635,12 @@ func ParseResourceEnv(lookup func(key, def string) (string, bool)) (map[sealtask
|
||||
continue
|
||||
}
|
||||
|
||||
} else {
|
||||
if !taskType.SectorSized() {
|
||||
log.Errorw("sector-size independent task resource var specified with sector-sized envvar", "env", taskType.Short()+"_"+shortSize+"_"+envname, "use", taskType.Short()+"_"+envname)
|
||||
}
|
||||
}
|
||||
|
||||
v := rr.Elem().Field(i).Addr().Interface()
|
||||
switch fv := v.(type) {
|
||||
case *uint64:
|
||||
|
@ -12,9 +12,12 @@ import (
|
||||
)
|
||||
|
||||
func TestListResourceVars(t *testing.T) {
|
||||
seen := map[string]struct{}{}
|
||||
_, err := ParseResourceEnv(func(key, def string) (string, bool) {
|
||||
if def != "" {
|
||||
_, s := seen[key]
|
||||
if !s && def != "" {
|
||||
fmt.Printf("%s=%s\n", key, def)
|
||||
seen[key] = struct{}{}
|
||||
}
|
||||
|
||||
return "", false
|
||||
@ -75,3 +78,44 @@ func TestListResourceSDRMulticoreOverride(t *testing.T) {
|
||||
require.Equal(t, 9001, rt[sealtasks.TTPreCommit1][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].MaxParallelism)
|
||||
require.Equal(t, 9001, rt[sealtasks.TTUnseal][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].MaxParallelism)
|
||||
}
|
||||
|
||||
func TestUnsizedSetAll(t *testing.T) {
|
||||
rt, err := ParseResourceEnv(func(key, def string) (string, bool) {
|
||||
if key == "UNS_MAX_PARALLELISM" {
|
||||
return "2", true
|
||||
}
|
||||
|
||||
return "", false
|
||||
})
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, rt[sealtasks.TTUnseal][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].MaxParallelism)
|
||||
require.Equal(t, 2, rt[sealtasks.TTUnseal][stabi.RegisteredSealProof_StackedDrg32GiBV1].MaxParallelism)
|
||||
require.Equal(t, 2, rt[sealtasks.TTUnseal][stabi.RegisteredSealProof_StackedDrg8MiBV1].MaxParallelism)
|
||||
|
||||
// check that defaults don't get mutated
|
||||
require.Equal(t, 1, ResourceTable[sealtasks.TTUnseal][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].MaxParallelism)
|
||||
}
|
||||
|
||||
func TestUnsizedNotPreferred(t *testing.T) {
|
||||
rt, err := ParseResourceEnv(func(key, def string) (string, bool) {
|
||||
if key == "DC_MAX_PARALLELISM" {
|
||||
return "2", true
|
||||
}
|
||||
|
||||
// test should also print a warning for DataCid as it's not sector-size dependent
|
||||
if key == "DC_64G_MAX_PARALLELISM" {
|
||||
return "1", true
|
||||
}
|
||||
|
||||
return "", false
|
||||
})
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, rt[sealtasks.TTDataCid][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].MaxParallelism)
|
||||
require.Equal(t, 2, rt[sealtasks.TTDataCid][stabi.RegisteredSealProof_StackedDrg32GiBV1].MaxParallelism)
|
||||
require.Equal(t, 1, rt[sealtasks.TTDataCid][stabi.RegisteredSealProof_StackedDrg64GiBV1_1].MaxParallelism)
|
||||
|
||||
// check that defaults don't get mutated
|
||||
require.Equal(t, 1, ResourceTable[sealtasks.TTUnseal][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].MaxParallelism)
|
||||
}
|
||||
|
@ -49,6 +49,7 @@ func init() {
|
||||
// on chain before returning.
|
||||
//
|
||||
// TODO: the waiting should happen in the background. Right now this
|
||||
//
|
||||
// is blocking/delaying the actual generation and submission of WindowPoSts in
|
||||
// this deadline!
|
||||
func (s *WindowPoStScheduler) declareRecoveries(ctx context.Context, dlIdx uint64, partitions []api.Partition, tsk types.TipSetKey) ([][]miner.RecoveryDeclaration, []*types.SignedMessage, error) {
|
||||
@ -205,6 +206,7 @@ func (s *WindowPoStScheduler) declareRecoveries(ctx context.Context, dlIdx uint6
|
||||
// on chain before returning.
|
||||
//
|
||||
// TODO: the waiting should happen in the background. Right now this
|
||||
//
|
||||
// is blocking/delaying the actual generation and submission of WindowPoSts in
|
||||
// this deadline!
|
||||
func (s *WindowPoStScheduler) declareFaults(ctx context.Context, dlIdx uint64, partitions []api.Partition, tsk types.TipSetKey) ([]miner.FaultDeclaration, *types.SignedMessage, error) {
|
||||
|
@ -44,6 +44,7 @@ func getClientMode(groupSeq int64) ClientMode {
|
||||
}
|
||||
|
||||
// TODO Stress is currently WIP. We found blockers in Lotus that prevent us from
|
||||
//
|
||||
// making progress. See https://github.com/filecoin-project/lotus/issues/2297.
|
||||
func Stress(t *testkit.TestEnvironment) error {
|
||||
// Dispatch/forward non-client roles to defaults.
|
||||
|
Loading…
Reference in New Issue
Block a user