Merge branch 'master' into slashfilter

This commit is contained in:
swift-mx 2023-05-31 10:02:01 +08:00 committed by GitHub
commit 2fd8602c5a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
52 changed files with 1560 additions and 199 deletions

View File

@ -33,6 +33,9 @@ import (
// * Generate openrpc blobs
type Gateway interface {
MpoolPending(context.Context, types.TipSetKey) ([]*types.SignedMessage, error)
ChainGetBlock(context.Context, cid.Cid) (*types.BlockHeader, error)
MinerGetBaseInfo(context.Context, address.Address, abi.ChainEpoch, types.TipSetKey) (*MiningBaseInfo, error)
StateMinerSectorCount(context.Context, address.Address, types.TipSetKey) (MinerSectors, error)
GasEstimateGasPremium(context.Context, uint64, address.Address, int64, types.TipSetKey) (types.BigInt, error)
StateReplay(context.Context, types.TipSetKey, cid.Cid) (*InvocResult, error)

View File

@ -86,6 +86,7 @@ func init() {
}
ExampleValues[reflect.TypeOf(addr)] = addr
ExampleValues[reflect.TypeOf(&addr)] = &addr
pid, err := peer.Decode("12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf")
if err != nil {

View File

@ -632,6 +632,8 @@ type GatewayStruct struct {
}
type GatewayMethods struct {
ChainGetBlock func(p0 context.Context, p1 cid.Cid) (*types.BlockHeader, error) ``
ChainGetBlockMessages func(p0 context.Context, p1 cid.Cid) (*BlockMessages, error) ``
ChainGetGenesis func(p0 context.Context) (*types.TipSet, error) ``
@ -734,8 +736,12 @@ type GatewayMethods struct {
GasEstimateMessageGas func(p0 context.Context, p1 *types.Message, p2 *MessageSendSpec, p3 types.TipSetKey) (*types.Message, error) ``
MinerGetBaseInfo func(p0 context.Context, p1 address.Address, p2 abi.ChainEpoch, p3 types.TipSetKey) (*MiningBaseInfo, error) ``
MpoolGetNonce func(p0 context.Context, p1 address.Address) (uint64, error) ``
MpoolPending func(p0 context.Context, p1 types.TipSetKey) ([]*types.SignedMessage, error) ``
MpoolPush func(p0 context.Context, p1 *types.SignedMessage) (cid.Cid, error) ``
MsigGetAvailableBalance func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (types.BigInt, error) ``
@ -4109,6 +4115,17 @@ func (s *FullNodeStub) Web3ClientVersion(p0 context.Context) (string, error) {
return "", ErrNotSupported
}
func (s *GatewayStruct) ChainGetBlock(p0 context.Context, p1 cid.Cid) (*types.BlockHeader, error) {
if s.Internal.ChainGetBlock == nil {
return nil, ErrNotSupported
}
return s.Internal.ChainGetBlock(p0, p1)
}
func (s *GatewayStub) ChainGetBlock(p0 context.Context, p1 cid.Cid) (*types.BlockHeader, error) {
return nil, ErrNotSupported
}
func (s *GatewayStruct) ChainGetBlockMessages(p0 context.Context, p1 cid.Cid) (*BlockMessages, error) {
if s.Internal.ChainGetBlockMessages == nil {
return nil, ErrNotSupported
@ -4670,6 +4687,17 @@ func (s *GatewayStub) GasEstimateMessageGas(p0 context.Context, p1 *types.Messag
return nil, ErrNotSupported
}
func (s *GatewayStruct) MinerGetBaseInfo(p0 context.Context, p1 address.Address, p2 abi.ChainEpoch, p3 types.TipSetKey) (*MiningBaseInfo, error) {
if s.Internal.MinerGetBaseInfo == nil {
return nil, ErrNotSupported
}
return s.Internal.MinerGetBaseInfo(p0, p1, p2, p3)
}
func (s *GatewayStub) MinerGetBaseInfo(p0 context.Context, p1 address.Address, p2 abi.ChainEpoch, p3 types.TipSetKey) (*MiningBaseInfo, error) {
return nil, ErrNotSupported
}
func (s *GatewayStruct) MpoolGetNonce(p0 context.Context, p1 address.Address) (uint64, error) {
if s.Internal.MpoolGetNonce == nil {
return 0, ErrNotSupported
@ -4681,6 +4709,17 @@ func (s *GatewayStub) MpoolGetNonce(p0 context.Context, p1 address.Address) (uin
return 0, ErrNotSupported
}
func (s *GatewayStruct) MpoolPending(p0 context.Context, p1 types.TipSetKey) ([]*types.SignedMessage, error) {
if s.Internal.MpoolPending == nil {
return *new([]*types.SignedMessage), ErrNotSupported
}
return s.Internal.MpoolPending(p0, p1)
}
func (s *GatewayStub) MpoolPending(p0 context.Context, p1 types.TipSetKey) ([]*types.SignedMessage, error) {
return *new([]*types.SignedMessage), ErrNotSupported
}
func (s *GatewayStruct) MpoolPush(p0 context.Context, p1 *types.SignedMessage) (cid.Cid, error) {
if s.Internal.MpoolPush == nil {
return *new(cid.Cid), ErrNotSupported

View File

@ -299,6 +299,7 @@ type MinerInfo struct {
SectorSize abi.SectorSize
WindowPoStPartitionSectors uint64
ConsensusFaultElapsed abi.ChainEpoch
PendingOwnerAddress *address.Address
Beneficiary address.Address
BeneficiaryTerm *miner.BeneficiaryTerm
PendingBeneficiaryTerm *miner.PendingBeneficiaryChange

View File

@ -35,6 +35,9 @@ import (
// * Generate openrpc blobs
type Gateway interface {
MpoolPending(context.Context, types.TipSetKey) ([]*types.SignedMessage, error)
ChainGetBlock(context.Context, cid.Cid) (*types.BlockHeader, error)
MinerGetBaseInfo(context.Context, address.Address, abi.ChainEpoch, types.TipSetKey) (*api.MiningBaseInfo, error)
StateMinerSectorCount(context.Context, address.Address, types.TipSetKey) (api.MinerSectors, error)
GasEstimateGasPremium(context.Context, uint64, address.Address, int64, types.TipSetKey) (types.BigInt, error)
StateReplay(context.Context, types.TipSetKey, cid.Cid) (*api.InvocResult, error)

View File

@ -431,6 +431,8 @@ type GatewayStruct struct {
}
type GatewayMethods struct {
ChainGetBlock func(p0 context.Context, p1 cid.Cid) (*types.BlockHeader, error) ``
ChainGetBlockMessages func(p0 context.Context, p1 cid.Cid) (*api.BlockMessages, error) ``
ChainGetMessage func(p0 context.Context, p1 cid.Cid) (*types.Message, error) ``
@ -453,8 +455,12 @@ type GatewayMethods struct {
GasEstimateMessageGas func(p0 context.Context, p1 *types.Message, p2 *api.MessageSendSpec, p3 types.TipSetKey) (*types.Message, error) ``
MinerGetBaseInfo func(p0 context.Context, p1 address.Address, p2 abi.ChainEpoch, p3 types.TipSetKey) (*api.MiningBaseInfo, error) ``
MpoolGetNonce func(p0 context.Context, p1 address.Address) (uint64, error) ``
MpoolPending func(p0 context.Context, p1 types.TipSetKey) ([]*types.SignedMessage, error) ``
MpoolPush func(p0 context.Context, p1 *types.SignedMessage) (cid.Cid, error) ``
MsigGetAvailableBalance func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (types.BigInt, error) ``
@ -2581,6 +2587,17 @@ func (s *FullNodeStub) WalletVerify(p0 context.Context, p1 address.Address, p2 [
return false, ErrNotSupported
}
func (s *GatewayStruct) ChainGetBlock(p0 context.Context, p1 cid.Cid) (*types.BlockHeader, error) {
if s.Internal.ChainGetBlock == nil {
return nil, ErrNotSupported
}
return s.Internal.ChainGetBlock(p0, p1)
}
func (s *GatewayStub) ChainGetBlock(p0 context.Context, p1 cid.Cid) (*types.BlockHeader, error) {
return nil, ErrNotSupported
}
func (s *GatewayStruct) ChainGetBlockMessages(p0 context.Context, p1 cid.Cid) (*api.BlockMessages, error) {
if s.Internal.ChainGetBlockMessages == nil {
return nil, ErrNotSupported
@ -2702,6 +2719,17 @@ func (s *GatewayStub) GasEstimateMessageGas(p0 context.Context, p1 *types.Messag
return nil, ErrNotSupported
}
func (s *GatewayStruct) MinerGetBaseInfo(p0 context.Context, p1 address.Address, p2 abi.ChainEpoch, p3 types.TipSetKey) (*api.MiningBaseInfo, error) {
if s.Internal.MinerGetBaseInfo == nil {
return nil, ErrNotSupported
}
return s.Internal.MinerGetBaseInfo(p0, p1, p2, p3)
}
func (s *GatewayStub) MinerGetBaseInfo(p0 context.Context, p1 address.Address, p2 abi.ChainEpoch, p3 types.TipSetKey) (*api.MiningBaseInfo, error) {
return nil, ErrNotSupported
}
func (s *GatewayStruct) MpoolGetNonce(p0 context.Context, p1 address.Address) (uint64, error) {
if s.Internal.MpoolGetNonce == nil {
return 0, ErrNotSupported
@ -2713,6 +2741,17 @@ func (s *GatewayStub) MpoolGetNonce(p0 context.Context, p1 address.Address) (uin
return 0, ErrNotSupported
}
func (s *GatewayStruct) MpoolPending(p0 context.Context, p1 types.TipSetKey) ([]*types.SignedMessage, error) {
if s.Internal.MpoolPending == nil {
return *new([]*types.SignedMessage), ErrNotSupported
}
return s.Internal.MpoolPending(p0, p1)
}
func (s *GatewayStub) MpoolPending(p0 context.Context, p1 types.TipSetKey) ([]*types.SignedMessage, error) {
return *new([]*types.SignedMessage), ErrNotSupported
}
func (s *GatewayStruct) MpoolPush(p0 context.Context, p1 *types.SignedMessage) (cid.Cid, error) {
if s.Internal.MpoolPush == nil {
return *new(cid.Cid), ErrNotSupported

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -61,6 +61,10 @@ const (
MaxPreCommitRandomnessLookback = builtin11.EpochsInDay + SealRandomnessLookback
)
var (
MarketDefaultAllocationTermBuffer = market11.MarketDefaultAllocationTermBuffer
)
// SetSupportedProofTypes sets supported proof types, across all actor versions.
// This should only be used for testing.
func SetSupportedProofTypes(types ...abi.RegisteredSealProof) {

View File

@ -39,6 +39,10 @@ const (
MaxPreCommitRandomnessLookback = builtin{{.latestVersion}}.EpochsInDay + SealRandomnessLookback
)
var (
MarketDefaultAllocationTermBuffer = market{{.latestVersion}}.MarketDefaultAllocationTermBuffer
)
// SetSupportedProofTypes sets supported proof types, across all actor versions.
// This should only be used for testing.
func SetSupportedProofTypes(types ...abi.RegisteredSealProof) {

View File

@ -11,7 +11,6 @@ import (
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
)
@ -28,10 +27,6 @@ func New(dstore ds.Batching) *SlashFilter {
}
func (f *SlashFilter) CheckBlock(ctx context.Context, bh *types.BlockHeader, parentEpoch abi.ChainEpoch) (cid.Cid, error) {
if build.IsNearUpgrade(bh.Height, build.UpgradeOrangeHeight) {
return cid.Undef, nil
}
epochKey := ds.NewKey(fmt.Sprintf("/%s/%d", bh.Miner, bh.Height))
{
// double-fork mining (2 blocks at one epoch)

View File

@ -11,9 +11,11 @@ import (
"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
big2 "github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/go-state-types/network"
builtin2 "github.com/filecoin-project/specs-actors/v2/actors/builtin"
@ -524,6 +526,36 @@ func TestPruningSimple(t *testing.T) {
}
}
func TestGasRewardNegative(t *testing.T) {
var mp MessagePool
msg := types.SignedMessage{
Message: types.Message{
GasLimit: 1000,
GasFeeCap: big2.NewInt(20000),
GasPremium: big2.NewInt(15000),
},
}
baseFee := big2.NewInt(30000)
// Over the GasPremium, but under the BaseFee
gr1 := mp.getGasReward(&msg, baseFee)
msg.Message.GasFeeCap = big2.NewInt(15000)
// Equal to GasPremium, under the BaseFee
gr2 := mp.getGasReward(&msg, baseFee)
msg.Message.GasFeeCap = big2.NewInt(10000)
// Under both GasPremium and BaseFee
gr3 := mp.getGasReward(&msg, baseFee)
require.True(t, gr1.Sign() < 0)
require.True(t, gr2.Sign() < 0)
require.True(t, gr3.Sign() < 0)
require.True(t, gr1.Cmp(gr2) > 0)
require.True(t, gr2.Cmp(gr3) > 0)
}
func TestLoadLocal(t *testing.T) {
tma := newTestMpoolAPI()
ds := datastore.NewMapDatastore()

View File

@ -13,6 +13,7 @@ import (
"os"
"sort"
"testing"
"time"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
@ -1690,3 +1691,188 @@ readLoop:
}
}
func TestRealWorldSelectionTiming(t *testing.T) {
//stm: @TOKEN_WALLET_NEW_001, @TOKEN_WALLET_SIGN_001, @CHAIN_MEMPOOL_SELECT_001
// load test-messages.json.gz and rewrite the messages so that
// 1) we map each real actor to a test actor so that we can sign the messages
// 2) adjust the nonces so that they start from 0
file, err := os.Open("test-messages2.json.gz")
if err != nil {
t.Fatal(err)
}
gzr, err := gzip.NewReader(file)
if err != nil {
t.Fatal(err)
}
dec := json.NewDecoder(gzr)
var msgs []*types.SignedMessage
baseNonces := make(map[address.Address]uint64)
readLoop:
for {
m := new(types.SignedMessage)
err := dec.Decode(m)
switch err {
case nil:
msgs = append(msgs, m)
nonce, ok := baseNonces[m.Message.From]
if !ok || m.Message.Nonce < nonce {
baseNonces[m.Message.From] = m.Message.Nonce
}
case io.EOF:
break readLoop
default:
t.Fatal(err)
}
}
actorMap := make(map[address.Address]address.Address)
actorWallets := make(map[address.Address]api.Wallet)
for _, m := range msgs {
baseNonce := baseNonces[m.Message.From]
localActor, ok := actorMap[m.Message.From]
if !ok {
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
if err != nil {
t.Fatal(err)
}
a, err := w.WalletNew(context.Background(), types.KTSecp256k1)
if err != nil {
t.Fatal(err)
}
actorMap[m.Message.From] = a
actorWallets[a] = w
localActor = a
}
w, ok := actorWallets[localActor]
if !ok {
t.Fatalf("failed to lookup wallet for actor %s", localActor)
}
m.Message.From = localActor
m.Message.Nonce -= baseNonce
sig, err := w.WalletSign(context.TODO(), localActor, m.Message.Cid().Bytes(), api.MsgMeta{})
if err != nil {
t.Fatal(err)
}
m.Signature = *sig
}
mp, tma := makeTestMpool()
block := tma.nextBlockWithHeight(uint64(build.UpgradeHyggeHeight) + 10)
ts := mock.TipSet(block)
tma.applyBlock(t, block)
for _, a := range actorMap {
tma.setBalance(a, 1000000)
}
tma.baseFee = types.NewInt(800_000_000)
sort.Slice(msgs, func(i, j int) bool {
return msgs[i].Message.Nonce < msgs[j].Message.Nonce
})
// add the messages
for _, m := range msgs {
mustAdd(t, mp, m)
}
// do message selection and check block packing
minGasLimit := int64(0.9 * float64(build.BlockGasLimit))
// greedy first
start := time.Now()
selected, err := mp.SelectMessages(context.Background(), ts, 1.0)
if err != nil {
t.Fatal(err)
}
t.Logf("selected %d messages in %s", len(selected), time.Since(start))
gasLimit := int64(0)
for _, m := range selected {
gasLimit += m.Message.GasLimit
}
if gasLimit < minGasLimit {
t.Fatalf("failed to pack with tq=1.0; packed %d, minimum packing: %d", gasLimit, minGasLimit)
}
// high quality ticket
start = time.Now()
selected, err = mp.SelectMessages(context.Background(), ts, .8)
if err != nil {
t.Fatal(err)
}
t.Logf("selected %d messages in %s", len(selected), time.Since(start))
gasLimit = int64(0)
for _, m := range selected {
gasLimit += m.Message.GasLimit
}
if gasLimit < minGasLimit {
t.Fatalf("failed to pack with tq=0.8; packed %d, minimum packing: %d", gasLimit, minGasLimit)
}
// mid quality ticket
start = time.Now()
selected, err = mp.SelectMessages(context.Background(), ts, .4)
if err != nil {
t.Fatal(err)
}
t.Logf("selected %d messages in %s", len(selected), time.Since(start))
gasLimit = int64(0)
for _, m := range selected {
gasLimit += m.Message.GasLimit
}
if gasLimit < minGasLimit {
t.Fatalf("failed to pack with tq=0.4; packed %d, minimum packing: %d", gasLimit, minGasLimit)
}
// low quality ticket
start = time.Now()
selected, err = mp.SelectMessages(context.Background(), ts, .1)
if err != nil {
t.Fatal(err)
}
t.Logf("selected %d messages in %s", len(selected), time.Since(start))
gasLimit = int64(0)
for _, m := range selected {
gasLimit += m.Message.GasLimit
}
if gasLimit < minGasLimit {
t.Fatalf("failed to pack with tq=0.1; packed %d, minimum packing: %d", gasLimit, minGasLimit)
}
// very low quality ticket
start = time.Now()
selected, err = mp.SelectMessages(context.Background(), ts, .01)
if err != nil {
t.Fatal(err)
}
t.Logf("selected %d messages in %s", len(selected), time.Since(start))
gasLimit = int64(0)
for _, m := range selected {
gasLimit += m.Message.GasLimit
}
if gasLimit < minGasLimit {
t.Fatalf("failed to pack with tq=0.01; packed %d, minimum packing: %d", gasLimit, minGasLimit)
}
}

Binary file not shown.

View File

@ -106,6 +106,7 @@ func main() {
sealBenchCmd,
simpleCmd,
importBenchCmd,
rpcCmd,
},
}

576
cmd/lotus-bench/rpc.go Normal file
View File

@ -0,0 +1,576 @@
package main
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"os"
"os/signal"
"sort"
"strconv"
"strings"
"sync"
"text/tabwriter"
"time"
"github.com/urfave/cli/v2"
)
var rpcCmd = &cli.Command{
Name: "rpc",
Usage: "Runs a concurrent stress test on one or more rpc methods and prints the performance metrics including latency distribution and histogram",
Description: `This benchmark is designed to stress test the rpc methods of a lotus node so that we can simulate real world usage and measure the performance of rpc methods on the node.
This benchmark has the following features:
* Can query each method both sequentially and concurrently
* Supports rate limiting
* Can query multiple different endpoints at once (supporting different concurrency level and rate limiting for each method)
* Gives a nice reporting summary of the stress testing of each method (including latency distribution, histogram and more)
* Easy to use
To use this benchmark you must specify the rpc methods you want to test using the --method options, the format of it is:
--method=NAME[:CONCURRENCY][:QPS][:PARAMS] where only NAME is required.
Here are some real examples:
lotus-bench rpc --method='eth_chainId' // run eth_chainId with default concurrency and qps
lotus-bench rpc --method='eth_chainId:3' // override concurrency to 3
lotus-bench rpc --method='eth_chainId::100' // override to 100 qps while using default concurrency
lotus-bench rpc --method='eth_chainId:3:100' // run using 3 workers but limit to 100 qps
lotus-bench rpc --method='eth_getTransactionCount:::["0xd4c70007F3F502f212c7e6794b94C06F36173B36", "latest"]' // run using optional params while using default concurrency and qps
lotus-bench rpc --method='eth_chainId' --method='eth_getTransactionCount:10:0:["0xd4c70007F3F502f212c7e6794b94C06F36173B36", "latest"]' // run multiple methods at once
NOTE: The last two examples will not work until we upgrade urfave dependency (tracked in https://github.com/urfave/cli/issues/1734)`,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "endpoint",
Value: "http://127.0.0.1:1234/rpc/v1",
Usage: "The rpc endpoint to benchmark",
},
&cli.DurationFlag{
Name: "duration",
Value: 60 * time.Second,
Usage: "Duration of benchmark in seconds",
},
&cli.IntFlag{
Name: "concurrency",
Value: 10,
Usage: "How many workers should be used per rpc method (can be overridden per method)",
},
&cli.IntFlag{
Name: "qps",
Value: 0,
Usage: "How many requests per second should be sent per rpc method (can be overridden per method), a value of 0 means no limit",
},
&cli.StringSliceFlag{
Name: "method",
Usage: `Method to benchmark, you can specify multiple methods by repeating this flag. You can also specify method specific options to set the concurrency and qps for each method (see usage).
`,
},
&cli.DurationFlag{
Name: "watch",
Value: 0 * time.Second,
Usage: "If >0 then generates reports every N seconds (only supports linux/unix)",
},
&cli.BoolFlag{
Name: "print-response",
Value: false,
Usage: "print the response of each request",
},
},
Action: func(cctx *cli.Context) error {
if len(cctx.StringSlice("method")) == 0 {
return errors.New("you must specify and least one method to benchmark")
}
var rpcMethods []*RPCMethod
for _, str := range cctx.StringSlice("method") {
entries := strings.SplitN(str, ":", 4)
if len(entries) == 0 {
return errors.New("invalid method format")
}
// check if concurrency was specified
concurrency := cctx.Int("concurrency")
if len(entries) > 1 {
if len(entries[1]) > 0 {
var err error
concurrency, err = strconv.Atoi(entries[1])
if err != nil {
return fmt.Errorf("could not parse concurrency value from method %s: %v", entries[0], err)
}
}
}
// check if qps was specified
qps := cctx.Int("qps")
if len(entries) > 2 {
if len(entries[2]) > 0 {
var err error
qps, err = strconv.Atoi(entries[2])
if err != nil {
return fmt.Errorf("could not parse qps value from method %s: %v", entries[0], err)
}
}
}
// check if params was specified
params := "[]"
if len(entries) > 3 {
params = entries[3]
}
rpcMethods = append(rpcMethods, &RPCMethod{
w: os.Stdout,
uri: cctx.String("endpoint"),
method: entries[0],
concurrency: concurrency,
qps: qps,
params: params,
printResp: cctx.Bool("print-response"),
})
}
// terminate early on ctrl+c
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
<-c
fmt.Println("Received interrupt, stopping...")
for _, method := range rpcMethods {
method.Stop()
}
}()
// stop all threads after duration
go func() {
time.Sleep(cctx.Duration("duration"))
for _, e := range rpcMethods {
e.Stop()
}
}()
// start all threads
var wg sync.WaitGroup
wg.Add(len(rpcMethods))
for _, e := range rpcMethods {
go func(e *RPCMethod) {
defer wg.Done()
err := e.Run()
if err != nil {
fmt.Printf("error running rpc method: %v\n", err)
}
}(e)
}
// if watch is set then print a report every N seconds
var progressCh chan struct{}
if cctx.Duration("watch") > 0 {
progressCh = make(chan struct{}, 1)
go func(progressCh chan struct{}) {
ticker := time.NewTicker(cctx.Duration("watch"))
for {
clearAndPrintReport := func() {
// clear the screen move the curser to the top left
fmt.Print("\033[2J")
fmt.Printf("\033[%d;%dH", 1, 1)
for i, e := range rpcMethods {
e.Report()
if i < len(rpcMethods)-1 {
fmt.Println()
}
}
}
select {
case <-ticker.C:
clearAndPrintReport()
case <-progressCh:
clearAndPrintReport()
return
}
}
}(progressCh)
}
wg.Wait()
if progressCh != nil {
// wait for the watch go routine to return
progressCh <- struct{}{}
// no need to print the report again
return nil
}
// print the report for each endpoint
for i, e := range rpcMethods {
e.Report()
if i < len(rpcMethods)-1 {
fmt.Println()
}
}
return nil
},
}
// RPCMethod handles the benchmarking of a single endpoint method.
type RPCMethod struct {
w io.Writer
// the endpoint uri
uri string
// the rpc method we want to benchmark
method string
// the number of concurrent requests to make to this endpoint
concurrency int
// if >0 then limit to qps is the max number of requests per second to make to this endpoint (0 = no limit)
qps int
// many endpoints require specific parameters to be passed
params string
// whether or not to print the response of each request (useful for debugging)
printResp bool
// instruct the worker go routines to stop
stopCh chan struct{}
// when the endpoint bencharking started
start time.Time
// results channel is used by the workers to send results to the reporter
results chan *result
// reporter handles reading the results from workers and printing the report statistics
reporter *Reporter
}
// result is the result of a single rpc method request.
type result struct {
err error
statusCode *int
duration time.Duration
}
func (rpc *RPCMethod) Run() error {
client := &http.Client{
Timeout: 0,
}
var wg sync.WaitGroup
wg.Add(rpc.concurrency)
rpc.results = make(chan *result, rpc.concurrency*1_000)
rpc.stopCh = make(chan struct{}, rpc.concurrency)
go func() {
rpc.reporter = NewReporter(rpc.results, rpc.w)
rpc.reporter.Run()
}()
rpc.start = time.Now()
// throttle the number of requests per second
var qpsTicker *time.Ticker
if rpc.qps > 0 {
qpsTicker = time.NewTicker(time.Second / time.Duration(rpc.qps))
}
for i := 0; i < rpc.concurrency; i++ {
go func() {
rpc.startWorker(client, qpsTicker)
wg.Done()
}()
}
wg.Wait()
// close the results channel so reporter will stop
close(rpc.results)
// wait until the reporter is done
<-rpc.reporter.doneCh
return nil
}
func (rpc *RPCMethod) startWorker(client *http.Client, qpsTicker *time.Ticker) {
for {
// check if we should stop
select {
case <-rpc.stopCh:
return
default:
}
// wait for the next tick if we are rate limiting this endpoint
if qpsTicker != nil {
<-qpsTicker.C
}
req, err := rpc.buildRequest()
if err != nil {
log.Fatalln(err)
}
start := time.Now()
var statusCode *int
// send request the endpoint
resp, err := client.Do(req)
if err != nil {
err = fmt.Errorf("HTTP error: %s", err.Error())
} else {
statusCode = &resp.StatusCode
// there was not a HTTP error but we need to still check the json response for errrors
var data []byte
data, err = io.ReadAll(resp.Body)
if err != nil {
log.Fatalln(err)
}
// we are only interested if it has the error field in the response
type respData struct {
Error struct {
Code int `json:"code"`
Message string `json:"message"`
} `json:"error"`
}
// unmarshal the response into a struct so we can check for errors
var d respData
err = json.Unmarshal(data, &d)
if err != nil {
log.Fatalln(err)
}
// if the response has an error json message then it should be considered an error just like any http error
if len(d.Error.Message) > 0 {
// truncate the error message if it is too long
if len(d.Error.Message) > 1000 {
d.Error.Message = d.Error.Message[:1000] + "..."
}
// remove newlines from the error message so we don't screw up the report
d.Error.Message = strings.ReplaceAll(d.Error.Message, "\n", "")
err = fmt.Errorf("JSON error: code:%d, message:%s", d.Error.Code, d.Error.Message)
}
if rpc.printResp {
fmt.Printf("[%s] %s", rpc.method, string(data))
}
resp.Body.Close() //nolint:errcheck
}
rpc.results <- &result{
statusCode: statusCode,
err: err,
duration: time.Since(start),
}
}
}
func (rpc *RPCMethod) buildRequest() (*http.Request, error) {
jreq, err := json.Marshal(struct {
Jsonrpc string `json:"jsonrpc"`
ID int `json:"id"`
Method string `json:"method"`
Params json.RawMessage `json:"params"`
}{
Jsonrpc: "2.0",
Method: rpc.method,
Params: json.RawMessage(rpc.params),
ID: 0,
})
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST", rpc.uri, bytes.NewReader(jreq))
if err != nil {
return nil, err
}
req.Header.Set("Accept", "application/json")
return req, nil
}
func (rpc *RPCMethod) Stop() {
for i := 0; i < rpc.concurrency; i++ {
rpc.stopCh <- struct{}{}
}
}
func (rpc *RPCMethod) Report() {
total := time.Since(rpc.start)
fmt.Fprintf(rpc.w, "[%s]:\n", rpc.method)
fmt.Fprintf(rpc.w, "- Options:\n")
fmt.Fprintf(rpc.w, " - concurrency: %d\n", rpc.concurrency)
fmt.Fprintf(rpc.w, " - params: %s\n", rpc.params)
fmt.Fprintf(rpc.w, " - qps: %d\n", rpc.qps)
rpc.reporter.Print(total, rpc.w)
}
// Reporter reads the results from the workers through the results channel and aggregates the results.
type Reporter struct {
// write the report to this writer
w io.Writer
// the reporter read the results from this channel
results chan *result
// doneCh is used to signal that the reporter has finished reading the results (channel has closed)
doneCh chan bool
// lock protect the following fields during critical sections (if --watch was specified)
lock sync.Mutex
// the latencies of all requests
latencies []int64
// the number of requests that returned each status code
statusCodes map[int]int
// the number of errors that occurred
errors map[string]int
}
func NewReporter(results chan *result, w io.Writer) *Reporter {
return &Reporter{
w: w,
results: results,
doneCh: make(chan bool, 1),
statusCodes: make(map[int]int),
errors: make(map[string]int),
}
}
func (r *Reporter) Run() {
for res := range r.results {
r.lock.Lock()
r.latencies = append(r.latencies, res.duration.Milliseconds())
if res.statusCode != nil {
r.statusCodes[*res.statusCode]++
}
if res.err != nil {
if len(r.errors) < 1_000_000 {
r.errors[res.err.Error()]++
} else {
// we don't want to store too many errors in memory
r.errors["hidden"]++
}
} else {
r.errors["nil"]++
}
r.lock.Unlock()
}
r.doneCh <- true
}
func (r *Reporter) Print(elapsed time.Duration, w io.Writer) {
r.lock.Lock()
defer r.lock.Unlock()
nrReq := int64(len(r.latencies))
if nrReq == 0 {
fmt.Println("No requests were made")
return
}
// we need to sort the latencies slice to calculate the percentiles
sort.Slice(r.latencies, func(i, j int) bool {
return r.latencies[i] < r.latencies[j]
})
var totalLatency int64 = 0
for _, latency := range r.latencies {
totalLatency += latency
}
fmt.Fprintf(w, "- Total Requests: %d\n", nrReq)
fmt.Fprintf(w, "- Total Duration: %dms\n", elapsed.Milliseconds())
fmt.Fprintf(w, "- Requests/sec: %f\n", float64(nrReq)/elapsed.Seconds())
fmt.Fprintf(w, "- Avg latency: %dms\n", totalLatency/nrReq)
fmt.Fprintf(w, "- Median latency: %dms\n", r.latencies[nrReq/2])
fmt.Fprintf(w, "- Latency distribution:\n")
percentiles := []float64{0.1, 0.5, 0.9, 0.95, 0.99, 0.999}
for _, p := range percentiles {
idx := int64(p * float64(nrReq))
fmt.Fprintf(w, " %s%% in %dms\n", fmt.Sprintf("%.2f", p*100.0), r.latencies[idx])
}
// create a simple histogram with 10 buckets spanning the range of latency
// into equal ranges
//
nrBucket := 10
buckets := make([]Bucket, nrBucket)
latencyRange := r.latencies[len(r.latencies)-1]
bucketRange := latencyRange / int64(nrBucket)
// mark the end of each bucket
for i := 0; i < nrBucket; i++ {
buckets[i].start = int64(i) * bucketRange
buckets[i].end = buckets[i].start + bucketRange
// extend the last bucked by any remaning range caused by the integer division
if i == nrBucket-1 {
buckets[i].end = latencyRange
}
}
// count the number of requests in each bucket
currBucket := 0
for i := 0; i < len(r.latencies); {
if r.latencies[i] <= buckets[currBucket].end {
buckets[currBucket].cnt++
i++
} else {
currBucket++
}
}
// print the histogram using a tabwriter which will align the columns nicely
fmt.Fprintf(w, "- Histogram:\n")
const padding = 2
tabWriter := tabwriter.NewWriter(w, 0, 0, padding, ' ', tabwriter.AlignRight|tabwriter.Debug)
for i := 0; i < nrBucket; i++ {
ratio := float64(buckets[i].cnt) / float64(nrReq)
bars := strings.Repeat("#", int(ratio*100))
fmt.Fprintf(tabWriter, " %d-%dms\t%d\t%s (%s%%)\n", buckets[i].start, buckets[i].end, buckets[i].cnt, bars, fmt.Sprintf("%.2f", ratio*100))
}
tabWriter.Flush() //nolint:errcheck
fmt.Fprintf(w, "- Status codes:\n")
for code, cnt := range r.statusCodes {
fmt.Fprintf(w, " [%d]: %d\n", code, cnt)
}
// print the 10 most occurring errors (in case error values are not unique)
//
type kv struct {
err string
cnt int
}
var sortedErrors []kv
for err, cnt := range r.errors {
sortedErrors = append(sortedErrors, kv{err, cnt})
}
sort.Slice(sortedErrors, func(i, j int) bool {
return sortedErrors[i].cnt > sortedErrors[j].cnt
})
fmt.Fprintf(w, "- Errors (top 10):\n")
for i, se := range sortedErrors {
if i > 10 {
break
}
fmt.Fprintf(w, " [%s]: %d\n", se.err, se.cnt)
}
}
type Bucket struct {
start int64
// the end value of the bucket
end int64
// how many entries are in the bucket
cnt int
}

View File

@ -2,6 +2,7 @@ package main
import (
"bufio"
"encoding/csv"
"encoding/json"
"errors"
"fmt"
@ -319,6 +320,9 @@ var sectorsListCmd = &cli.Command{
Value: parallelSectorChecks,
},
},
Subcommands: []*cli.Command{
sectorsListUpgradeBoundsCmd,
},
Action: func(cctx *cli.Context) error {
// http mode allows for parallel json decoding/encoding, which was a bottleneck here
minerApi, closer, err := lcli.GetStorageMinerAPI(cctx, cliutil.StorageMinerUseHttp)
@ -586,6 +590,169 @@ var sectorsListCmd = &cli.Command{
},
}
var sectorsListUpgradeBoundsCmd = &cli.Command{
Name: "upgrade-bounds",
Usage: "Output upgrade bounds for available sectors",
Flags: []cli.Flag{
&cli.IntFlag{
Name: "buckets",
Value: 25,
},
&cli.BoolFlag{
Name: "csv",
Usage: "output machine-readable values",
},
&cli.BoolFlag{
Name: "deal-terms",
Usage: "bucket by how many deal-sectors can start at a given expiration",
},
},
Action: func(cctx *cli.Context) error {
minerApi, closer, err := lcli.GetStorageMinerAPI(cctx, cliutil.StorageMinerUseHttp)
if err != nil {
return err
}
defer closer()
fullApi, closer2, err := lcli.GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer2()
ctx := lcli.ReqContext(cctx)
list, err := minerApi.SectorsListInStates(ctx, []api.SectorState{
api.SectorState(sealing.Available),
})
if err != nil {
return xerrors.Errorf("getting sector list: %w", err)
}
head, err := fullApi.ChainHead(ctx)
if err != nil {
return xerrors.Errorf("getting chain head: %w", err)
}
filter := bitfield.New()
for _, s := range list {
filter.Set(uint64(s))
}
maddr, err := minerApi.ActorAddress(ctx)
if err != nil {
return err
}
sset, err := fullApi.StateMinerSectors(ctx, maddr, &filter, head.Key())
if err != nil {
return err
}
if len(sset) == 0 {
return nil
}
var minExpiration, maxExpiration abi.ChainEpoch
for _, s := range sset {
if s.Expiration < minExpiration || minExpiration == 0 {
minExpiration = s.Expiration
}
if s.Expiration > maxExpiration {
maxExpiration = s.Expiration
}
}
buckets := cctx.Int("buckets")
bucketSize := (maxExpiration - minExpiration) / abi.ChainEpoch(buckets)
bucketCounts := make([]int, buckets+1)
for b := range bucketCounts {
bucketMin := minExpiration + abi.ChainEpoch(b)*bucketSize
bucketMax := minExpiration + abi.ChainEpoch(b+1)*bucketSize
if cctx.Bool("deal-terms") {
bucketMax = bucketMax + policy.MarketDefaultAllocationTermBuffer
}
for _, s := range sset {
isInBucket := s.Expiration >= bucketMin && s.Expiration < bucketMax
if isInBucket {
bucketCounts[b]++
}
}
}
// Creating CSV writer
writer := csv.NewWriter(os.Stdout)
// Writing CSV headers
err = writer.Write([]string{"Max Expiration in Bucket", "Sector Count"})
if err != nil {
return xerrors.Errorf("writing csv headers: %w", err)
}
// Writing bucket details
if cctx.Bool("csv") {
for i := 0; i < buckets; i++ {
maxExp := minExpiration + abi.ChainEpoch(i+1)*bucketSize
timeStr := strconv.FormatInt(int64(maxExp), 10)
err = writer.Write([]string{
timeStr,
strconv.Itoa(bucketCounts[i]),
})
if err != nil {
return xerrors.Errorf("writing csv row: %w", err)
}
}
// Flush to make sure all data is written to the underlying writer
writer.Flush()
if err := writer.Error(); err != nil {
return xerrors.Errorf("flushing csv writer: %w", err)
}
return nil
}
tw := tablewriter.New(
tablewriter.Col("Bucket Expiration"),
tablewriter.Col("Sector Count"),
tablewriter.Col("Bar"),
)
var barCols = 40
var maxCount int
for _, c := range bucketCounts {
if c > maxCount {
maxCount = c
}
}
for i := 0; i < buckets; i++ {
maxExp := minExpiration + abi.ChainEpoch(i+1)*bucketSize
timeStr := cliutil.EpochTime(head.Height(), maxExp)
tw.Write(map[string]interface{}{
"Bucket Expiration": timeStr,
"Sector Count": color.YellowString("%d", bucketCounts[i]),
"Bar": "[" + color.GreenString(strings.Repeat("|", bucketCounts[i]*barCols/maxCount)) + strings.Repeat(" ", barCols-bucketCounts[i]*barCols/maxCount) + "]",
})
}
return tw.Flush(os.Stdout)
},
}
var sectorsRefsCmd = &cli.Command{
Name: "refs",
Usage: "List References to sectors",

View File

@ -653,7 +653,7 @@ fr32 padding is removed from the output.`,
return xerrors.Errorf("getting reader: %w", err)
}
rd, err := readStarter(0)
rd, err := readStarter(0, storiface.PaddedByteIndex(length))
if err != nil {
return xerrors.Errorf("starting reader: %w", err)
}

View File

@ -4202,7 +4202,7 @@ Inputs:
Response:
```json
{
"Channel": "\u003cempty\u003e",
"Channel": "f01234",
"From": "f01234",
"To": "f01234",
"ConfirmedAmt": "0",
@ -4233,7 +4233,7 @@ Inputs:
Response:
```json
{
"Channel": "\u003cempty\u003e",
"Channel": "f01234",
"From": "f01234",
"To": "f01234",
"ConfirmedAmt": "0",
@ -4953,7 +4953,7 @@ Response:
},
"Nonce": 42,
"Balance": "0",
"Address": "\u003cempty\u003e"
"Address": "f01234"
}
}
```
@ -5242,7 +5242,7 @@ Response:
},
"Nonce": 42,
"Balance": "0",
"Address": "\u003cempty\u003e"
"Address": "f01234"
}
```
@ -5985,6 +5985,7 @@ Response:
"SectorSize": 34359738368,
"WindowPoStPartitionSectors": 42,
"ConsensusFaultElapsed": 10101,
"PendingOwnerAddress": "f01234",
"Beneficiary": "f01234",
"BeneficiaryTerm": {
"Quota": "0",

View File

@ -5576,7 +5576,7 @@ Inputs:
Response:
```json
{
"Channel": "\u003cempty\u003e",
"Channel": "f01234",
"From": "f01234",
"To": "f01234",
"ConfirmedAmt": "0",
@ -5607,7 +5607,7 @@ Inputs:
Response:
```json
{
"Channel": "\u003cempty\u003e",
"Channel": "f01234",
"From": "f01234",
"To": "f01234",
"ConfirmedAmt": "0",
@ -6390,7 +6390,7 @@ Response:
},
"Nonce": 42,
"Balance": "0",
"Address": "\u003cempty\u003e"
"Address": "f01234"
}
}
```
@ -6730,7 +6730,7 @@ Response:
},
"Nonce": 42,
"Balance": "0",
"Address": "\u003cempty\u003e"
"Address": "f01234"
}
```
@ -7505,6 +7505,7 @@ Response:
"SectorSize": 34359738368,
"WindowPoStPartitionSectors": 42,
"ConsensusFaultElapsed": 10101,
"PendingOwnerAddress": "f01234",
"Beneficiary": "f01234",
"BeneficiaryTerm": {
"Quota": "0",

View File

@ -654,17 +654,37 @@ NAME:
lotus-miner sectors list - List sectors
USAGE:
lotus-miner sectors list [command options] [arguments...]
lotus-miner sectors list command [command options] [arguments...]
COMMANDS:
upgrade-bounds Output upgrade bounds for available sectors
help, h Shows a list of commands or help for one command
OPTIONS:
--check-parallelism value number of parallel requests to make for checking sector states (default: 300)
--events, -e display number of events the sector has received (default: false)
--show-removed, -r show removed sectors (default: false)
--fast, -f don't show on-chain info for better performance (default: false)
--events, -e display number of events the sector has received (default: false)
--initial-pledge, -p display initial pledge (default: false)
--seal-time, -t display how long it took for the sector to be sealed (default: false)
--show-removed, -r show removed sectors (default: false)
--states value filter sectors by a comma-separated list of states
--unproven, -u only show sectors which aren't in the 'Proving' state (default: false)
--check-parallelism value number of parallel requests to make for checking sector states (default: 300)
--help, -h show help (default: false)
```
#### lotus-miner sectors list upgrade-bounds
```
NAME:
lotus-miner sectors list upgrade-bounds - Output upgrade bounds for available sectors
USAGE:
lotus-miner sectors list upgrade-bounds [command options] [arguments...]
OPTIONS:
--buckets value (default: 25)
--csv output machine-readable values (default: false)
--deal-terms bucket by how many deal-sectors can start at a given expiration (default: false)
```

View File

@ -38,7 +38,7 @@
#
# type: []string
# env var: LOTUS_LIBP2P_LISTENADDRESSES
#ListenAddresses = ["/ip4/0.0.0.0/tcp/0", "/ip6/::/tcp/0"]
#ListenAddresses = ["/ip4/0.0.0.0/tcp/0", "/ip6/::/tcp/0", "/ip4/0.0.0.0/udp/0/quic-v1", "/ip6/::/udp/0/quic-v1", "/ip4/0.0.0.0/udp/0/quic-v1/webtransport", "/ip6/::/udp/0/quic-v1/webtransport"]
# Addresses to explicitally announce to other peers. If not specified,
# all interface addresses are announced

View File

@ -38,7 +38,7 @@
#
# type: []string
# env var: LOTUS_LIBP2P_LISTENADDRESSES
#ListenAddresses = ["/ip4/0.0.0.0/tcp/0", "/ip6/::/tcp/0"]
#ListenAddresses = ["/ip4/0.0.0.0/tcp/0", "/ip6/::/tcp/0", "/ip4/0.0.0.0/udp/0/quic-v1", "/ip6/::/udp/0/quic-v1", "/ip4/0.0.0.0/udp/0/quic-v1/webtransport", "/ip6/::/udp/0/quic-v1/webtransport"]
# Addresses to explicitally announce to other peers. If not specified,
# all interface addresses are announced
@ -515,15 +515,7 @@
# env var: LOTUS_SEALING_MINUPGRADESECTOREXPIRATION
#MinUpgradeSectorExpiration = 0
# When set to a non-zero value, minimum number of epochs until sector expiration above which upgrade candidates will
# be selected based on lowest initial pledge.
#
# Target sector expiration is calculated by looking at the input deal queue, sorting it by deal expiration, and
# selecting N deals from the queue up to sector size. The target expiration will be Nth deal end epoch, or in case
# where there weren't enough deals to fill a sector, DealMaxDuration (540 days = 1555200 epochs)
#
# Setting this to a high value (for example to maximum deal duration - 1555200) will disable selection based on
# initial pledge - upgrade sectors will always be chosen based on longest expiration
# DEPRECATED: Target expiration is no longer used
#
# type: uint64
# env var: LOTUS_SEALING_MINTARGETUPGRADESECTOREXPIRATION

View File

@ -43,6 +43,9 @@ const (
// TargetAPI defines the API methods that the Node depends on
// (to make it easy to mock for tests)
type TargetAPI interface {
MpoolPending(context.Context, types.TipSetKey) ([]*types.SignedMessage, error)
ChainGetBlock(context.Context, cid.Cid) (*types.BlockHeader, error)
MinerGetBaseInfo(context.Context, address.Address, abi.ChainEpoch, types.TipSetKey) (*api.MiningBaseInfo, error)
GasEstimateGasPremium(context.Context, uint64, address.Address, int64, types.TipSetKey) (types.BigInt, error)
StateReplay(context.Context, types.TipSetKey, cid.Cid) (*api.InvocResult, error)
StateMinerSectorCount(context.Context, address.Address, types.TipSetKey) (api.MinerSectors, error)

View File

@ -23,6 +23,33 @@ import (
"github.com/filecoin-project/lotus/node/modules/dtypes"
)
func (gw *Node) MpoolPending(ctx context.Context, tsk types.TipSetKey) ([]*types.SignedMessage, error) {
if err := gw.limit(ctx, stateRateLimitTokens); err != nil {
return nil, err
}
if err := gw.checkTipsetKey(ctx, tsk); err != nil {
return nil, err
}
return gw.target.MpoolPending(ctx, tsk)
}
func (gw *Node) ChainGetBlock(ctx context.Context, c cid.Cid) (*types.BlockHeader, error) {
if err := gw.limit(ctx, chainRateLimitTokens); err != nil {
return nil, err
}
return gw.target.ChainGetBlock(ctx, c)
}
func (gw *Node) MinerGetBaseInfo(ctx context.Context, addr address.Address, h abi.ChainEpoch, tsk types.TipSetKey) (*api.MiningBaseInfo, error) {
if err := gw.limit(ctx, stateRateLimitTokens); err != nil {
return nil, err
}
if err := gw.checkTipsetKey(ctx, tsk); err != nil {
return nil, err
}
return gw.target.MinerGetBaseInfo(ctx, addr, h, tsk)
}
func (gw *Node) StateReplay(ctx context.Context, tsk types.TipSetKey, c cid.Cid) (*api.InvocResult, error) {
if err := gw.limit(ctx, chainRateLimitTokens); err != nil {
return nil, err

2
go.mod
View File

@ -41,7 +41,7 @@ require (
github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc7
github.com/filecoin-project/go-fil-commcid v0.1.0
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0
github.com/filecoin-project/go-fil-markets v1.28.1
github.com/filecoin-project/go-fil-markets v1.28.2
github.com/filecoin-project/go-jsonrpc v0.3.1
github.com/filecoin-project/go-padreader v0.0.1
github.com/filecoin-project/go-paramfetch v0.0.4

4
go.sum
View File

@ -321,8 +321,8 @@ github.com/filecoin-project/go-fil-commcid v0.1.0 h1:3R4ds1A9r6cr8mvZBfMYxTS88Oq
github.com/filecoin-project/go-fil-commcid v0.1.0/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 h1:imrrpZWEHRnNqqv0tN7LXep5bFEVOVmQWHJvl2mgsGo=
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0/go.mod h1:73S8WSEWh9vr0fDJVnKADhfIv/d6dCbAGaAGWbdJEI8=
github.com/filecoin-project/go-fil-markets v1.28.1 h1:2iq1o/ZY1RSbVU8OX7nwWO66SYtdjKEngshc4Jeua/Y=
github.com/filecoin-project/go-fil-markets v1.28.1/go.mod h1:qy9LNu9t77I184VB6Pa4WKRtGfB8Vl0t8zfOLHkDqWY=
github.com/filecoin-project/go-fil-markets v1.28.2 h1:Ev9o8BYow+lo97Bwc6oOmZ2OxdiHeIDCQsfF/w/Vldc=
github.com/filecoin-project/go-fil-markets v1.28.2/go.mod h1:qy9LNu9t77I184VB6Pa4WKRtGfB8Vl0t8zfOLHkDqWY=
github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM=
github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24=
github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM=

View File

@ -48,7 +48,7 @@ func TestDMLevelPartialRetrieval(t *testing.T) {
ctx := context.Background()
kit.QuietMiningLogs()
client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC(), kit.MockProofs())
client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC())
dh := kit.NewDealHarness(t, client, miner, miner)
ens.InterconnectAll().BeginMining(50 * time.Millisecond)

View File

@ -184,7 +184,8 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur
dlinfo, err := bm.miner.FullNode.StateMinerProvingDeadline(ctx, bm.miner.ActorAddr, ts.Key())
require.NoError(bm.t, err)
if ts.Height()+1+abi.ChainEpoch(nulls) >= dlinfo.Last() { // Next block brings us past the last epoch in dline, we need to wait for miner to post
if ts.Height()+5+abi.ChainEpoch(nulls) >= dlinfo.Last() { // Next block brings us past the last epoch in dline, we need to wait for miner to post
bm.t.Logf("forcing post to get in before deadline closes at %d", dlinfo.Last())
bm.forcePoSt(ctx, ts, dlinfo)
}
@ -216,7 +217,8 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur
}
if !success {
// if we are mining a new null block and it brings us past deadline boundary we need to wait for miner to post
if ts.Height()+1+abi.ChainEpoch(nulls+i) >= dlinfo.Last() {
if ts.Height()+5+abi.ChainEpoch(nulls+i) >= dlinfo.Last() {
bm.t.Logf("forcing post to get in before deadline closes at %d", dlinfo.Last())
bm.forcePoSt(ctx, ts, dlinfo)
}
}

View File

@ -5,6 +5,7 @@ import (
"context"
"fmt"
"testing"
"time"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
@ -220,14 +221,17 @@ func TestNetBlockIPAddr(t *testing.T) {
firstAddrInfo, _ := firstNode.NetAddrsListen(ctx)
secondAddrInfo, _ := secondNode.NetAddrsListen(ctx)
var secondNodeIPs []string
secondNodeIPsMap := map[string]struct{}{} // so we can deduplicate
for _, addr := range secondAddrInfo.Addrs {
ip, err := manet.ToIP(addr)
if err != nil {
continue
}
secondNodeIPs = append(secondNodeIPs, ip.String())
secondNodeIPsMap[ip.String()] = struct{}{}
}
var secondNodeIPs []string
for s := range secondNodeIPsMap {
secondNodeIPs = append(secondNodeIPs, s)
}
// Sanity check that we're not already connected somehow
@ -243,6 +247,8 @@ func TestNetBlockIPAddr(t *testing.T) {
list, err := firstNode.NetBlockList(ctx)
require.NoError(t, err)
fmt.Println(list.IPAddrs)
fmt.Println(secondNodeIPs)
require.Equal(t, len(list.IPAddrs), len(secondNodeIPs), "expected %d blocked IPs", len(secondNodeIPs))
for _, blockedIP := range list.IPAddrs {
found := false
@ -256,10 +262,14 @@ func TestNetBlockIPAddr(t *testing.T) {
require.True(t, found, "blocked IP %s is not one of secondNodeIPs", blockedIP)
}
require.Error(t, secondNode.NetConnect(ctx, firstAddrInfo), "shouldn't be able to connect to second node")
connectedness, err = secondNode.NetConnectedness(ctx, firstAddrInfo.ID)
require.NoError(t, err, "failed to determine connectedness")
require.NotEqual(t, connectedness, network.Connected)
// a QUIC connection might still succeed when gated, but will be killed right after the handshake
_ = secondNode.NetConnect(ctx, firstAddrInfo)
require.Eventually(t, func() bool {
connectedness, err = secondNode.NetConnectedness(ctx, firstAddrInfo.ID)
require.NoError(t, err, "failed to determine connectedness")
return connectedness != network.Connected
}, time.Second*5, time.Millisecond*10)
// stm: @NETWORK_COMMON_BLOCK_REMOVE_001
err = firstNode.NetBlockRemove(ctx, api.NetBlockList{IPAddrs: secondNodeIPs})

View File

@ -0,0 +1,44 @@
package readerutil
import (
"io"
"os"
)
// NewReadSeekerFromReaderAt returns a new io.ReadSeeker from a io.ReaderAt.
// The returned io.ReadSeeker will read from the io.ReaderAt starting at the
// given base offset.
func NewReadSeekerFromReaderAt(readerAt io.ReaderAt, base int64) io.ReadSeeker {
return &readSeekerFromReaderAt{
readerAt: readerAt,
base: base,
pos: 0,
}
}
type readSeekerFromReaderAt struct {
readerAt io.ReaderAt
base int64
pos int64
}
func (rs *readSeekerFromReaderAt) Read(p []byte) (n int, err error) {
n, err = rs.readerAt.ReadAt(p, rs.pos+rs.base)
rs.pos += int64(n)
return n, err
}
func (rs *readSeekerFromReaderAt) Seek(offset int64, whence int) (int64, error) {
switch whence {
case io.SeekStart:
rs.pos = offset
case io.SeekCurrent:
rs.pos += offset
case io.SeekEnd:
return 0, io.ErrUnexpectedEOF
default:
return 0, os.ErrInvalid
}
return rs.pos, nil
}

View File

@ -61,6 +61,10 @@ var (
// vm execution
ExecutionLane, _ = tag.NewKey("lane")
// piecereader
PRReadType, _ = tag.NewKey("pr_type") // seq / rand
PRReadSize, _ = tag.NewKey("pr_size") // small / big
)
// Measures
@ -153,8 +157,9 @@ var (
SchedCycleOpenWindows = stats.Int64("sched/assigner_cycle_open_window", "Number of open windows in scheduling cycles", stats.UnitDimensionless)
SchedCycleQueueSize = stats.Int64("sched/assigner_cycle_task_queue_entry", "Number of task queue entries in scheduling cycles", stats.UnitDimensionless)
DagStorePRInitCount = stats.Int64("dagstore/pr_init_count", "PieceReader init count", stats.UnitDimensionless)
DagStorePRBytesRequested = stats.Int64("dagstore/pr_requested_bytes", "PieceReader requested bytes", stats.UnitBytes)
DagStorePRInitCount = stats.Int64("dagstore/pr_init_count", "PieceReader init count", stats.UnitDimensionless)
DagStorePRBytesRequested = stats.Int64("dagstore/pr_requested_bytes", "PieceReader requested bytes", stats.UnitBytes)
DagStorePRBytesDiscarded = stats.Int64("dagstore/pr_discarded_bytes", "PieceReader discarded bytes", stats.UnitBytes)
DagStorePRDiscardCount = stats.Int64("dagstore/pr_discard_count", "PieceReader discard count", stats.UnitDimensionless)
DagStorePRSeekBackCount = stats.Int64("dagstore/pr_seek_back_count", "PieceReader seek back count", stats.UnitDimensionless)
@ -162,6 +167,12 @@ var (
DagStorePRSeekBackBytes = stats.Int64("dagstore/pr_seek_back_bytes", "PieceReader seek back bytes", stats.UnitBytes)
DagStorePRSeekForwardBytes = stats.Int64("dagstore/pr_seek_forward_bytes", "PieceReader seek forward bytes", stats.UnitBytes)
DagStorePRAtHitBytes = stats.Int64("dagstore/pr_at_hit_bytes", "PieceReader ReadAt bytes from cache", stats.UnitBytes)
DagStorePRAtHitCount = stats.Int64("dagstore/pr_at_hit_count", "PieceReader ReadAt from cache hits", stats.UnitDimensionless)
DagStorePRAtCacheFillCount = stats.Int64("dagstore/pr_at_cache_fill_count", "PieceReader ReadAt full cache fill count", stats.UnitDimensionless)
DagStorePRAtReadBytes = stats.Int64("dagstore/pr_at_read_bytes", "PieceReader ReadAt bytes read from source", stats.UnitBytes) // PRReadSize tag
DagStorePRAtReadCount = stats.Int64("dagstore/pr_at_read_count", "PieceReader ReadAt reads from source", stats.UnitDimensionless) // PRReadSize tag
// splitstore
SplitstoreMiss = stats.Int64("splitstore/miss", "Number of misses in hotstre access", stats.UnitDimensionless)
SplitstoreCompactionTimeSeconds = stats.Float64("splitstore/compaction_time", "Compaction time in seconds", stats.UnitSeconds)
@ -487,6 +498,7 @@ var (
DagStorePRBytesRequestedView = &view.View{
Measure: DagStorePRBytesRequested,
Aggregation: view.Sum(),
TagKeys: []tag.Key{PRReadType},
}
DagStorePRBytesDiscardedView = &view.View{
Measure: DagStorePRBytesDiscarded,
@ -513,6 +525,29 @@ var (
Aggregation: view.Sum(),
}
DagStorePRAtHitBytesView = &view.View{
Measure: DagStorePRAtHitBytes,
Aggregation: view.Sum(),
}
DagStorePRAtHitCountView = &view.View{
Measure: DagStorePRAtHitCount,
Aggregation: view.Count(),
}
DagStorePRAtCacheFillCountView = &view.View{
Measure: DagStorePRAtCacheFillCount,
Aggregation: view.Count(),
}
DagStorePRAtReadBytesView = &view.View{
Measure: DagStorePRAtReadBytes,
Aggregation: view.Sum(),
TagKeys: []tag.Key{PRReadSize},
}
DagStorePRAtReadCountView = &view.View{
Measure: DagStorePRAtReadCount,
Aggregation: view.Count(),
TagKeys: []tag.Key{PRReadSize},
}
// splitstore
SplitstoreMissView = &view.View{
Measure: SplitstoreMiss,
@ -779,6 +814,11 @@ var MinerNodeViews = append([]*view.View{
DagStorePRSeekForwardCountView,
DagStorePRSeekBackBytesView,
DagStorePRSeekForwardBytesView,
DagStorePRAtHitBytesView,
DagStorePRAtHitCountView,
DagStorePRAtCacheFillCountView,
DagStorePRAtReadBytesView,
DagStorePRAtReadCountView,
}, DefaultViews...)
var GatewayNodeViews = append([]*view.View{

View File

@ -57,6 +57,10 @@ func defCommon() Common {
ListenAddresses: []string{
"/ip4/0.0.0.0/tcp/0",
"/ip6/::/tcp/0",
"/ip4/0.0.0.0/udp/0/quic-v1",
"/ip6/::/udp/0/quic-v1",
"/ip4/0.0.0.0/udp/0/quic-v1/webtransport",
"/ip6/::/udp/0/quic-v1/webtransport",
},
AnnounceAddresses: []string{},
NoAnnounceAddresses: []string{},

View File

@ -1106,15 +1106,7 @@ required to have expiration of at least the soonest-ending deal`,
Name: "MinTargetUpgradeSectorExpiration",
Type: "uint64",
Comment: `When set to a non-zero value, minimum number of epochs until sector expiration above which upgrade candidates will
be selected based on lowest initial pledge.
Target sector expiration is calculated by looking at the input deal queue, sorting it by deal expiration, and
selecting N deals from the queue up to sector size. The target expiration will be Nth deal end epoch, or in case
where there weren't enough deals to fill a sector, DealMaxDuration (540 days = 1555200 epochs)
Setting this to a high value (for example to maximum deal duration - 1555200) will disable selection based on
initial pledge - upgrade sectors will always be chosen based on longest expiration`,
Comment: `DEPRECATED: Target expiration is no longer used`,
},
{
Name: "CommittedCapacitySectorLifetime",

View File

@ -352,15 +352,7 @@ type SealingConfig struct {
// required to have expiration of at least the soonest-ending deal
MinUpgradeSectorExpiration uint64
// When set to a non-zero value, minimum number of epochs until sector expiration above which upgrade candidates will
// be selected based on lowest initial pledge.
//
// Target sector expiration is calculated by looking at the input deal queue, sorting it by deal expiration, and
// selecting N deals from the queue up to sector size. The target expiration will be Nth deal end epoch, or in case
// where there weren't enough deals to fill a sector, DealMaxDuration (540 days = 1555200 epochs)
//
// Setting this to a high value (for example to maximum deal duration - 1555200) will disable selection based on
// initial pledge - upgrade sectors will always be chosen based on longest expiration
// DEPRECATED: Target expiration is no longer used
MinTargetUpgradeSectorExpiration uint64
// CommittedCapacitySectorLifetime is the duration a Committed Capacity (CC) sector will

View File

@ -180,6 +180,7 @@ func (m *StateModule) StateMinerInfo(ctx context.Context, actor address.Address,
SectorSize: info.SectorSize,
WindowPoStPartitionSectors: info.WindowPoStPartitionSectors,
ConsensusFaultElapsed: info.ConsensusFaultElapsed,
PendingOwnerAddress: info.PendingOwnerAddress,
Beneficiary: info.Beneficiary,
BeneficiaryTerm: &info.BeneficiaryTerm,
PendingBeneficiaryTerm: info.PendingBeneficiaryTerm,

View File

@ -983,19 +983,18 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error
return func(cfg sealiface.Config) (err error) {
err = mutateSealingCfg(r, func(c config.SealingConfiger) {
newCfg := config.SealingConfig{
MaxWaitDealsSectors: cfg.MaxWaitDealsSectors,
MaxSealingSectors: cfg.MaxSealingSectors,
MaxSealingSectorsForDeals: cfg.MaxSealingSectorsForDeals,
PreferNewSectorsForDeals: cfg.PreferNewSectorsForDeals,
MaxUpgradingSectors: cfg.MaxUpgradingSectors,
CommittedCapacitySectorLifetime: config.Duration(cfg.CommittedCapacitySectorLifetime),
WaitDealsDelay: config.Duration(cfg.WaitDealsDelay),
MakeNewSectorForDeals: cfg.MakeNewSectorForDeals,
MinUpgradeSectorExpiration: cfg.MinUpgradeSectorExpiration,
MinTargetUpgradeSectorExpiration: cfg.MinTargetUpgradeSectorExpiration,
MakeCCSectorsAvailable: cfg.MakeCCSectorsAvailable,
AlwaysKeepUnsealedCopy: cfg.AlwaysKeepUnsealedCopy,
FinalizeEarly: cfg.FinalizeEarly,
MaxWaitDealsSectors: cfg.MaxWaitDealsSectors,
MaxSealingSectors: cfg.MaxSealingSectors,
MaxSealingSectorsForDeals: cfg.MaxSealingSectorsForDeals,
PreferNewSectorsForDeals: cfg.PreferNewSectorsForDeals,
MaxUpgradingSectors: cfg.MaxUpgradingSectors,
CommittedCapacitySectorLifetime: config.Duration(cfg.CommittedCapacitySectorLifetime),
WaitDealsDelay: config.Duration(cfg.WaitDealsDelay),
MakeNewSectorForDeals: cfg.MakeNewSectorForDeals,
MinUpgradeSectorExpiration: cfg.MinUpgradeSectorExpiration,
MakeCCSectorsAvailable: cfg.MakeCCSectorsAvailable,
AlwaysKeepUnsealedCopy: cfg.AlwaysKeepUnsealedCopy,
FinalizeEarly: cfg.FinalizeEarly,
CollateralFromMinerBalance: cfg.CollateralFromMinerBalance,
AvailableBalanceBuffer: types.FIL(cfg.AvailableBalanceBuffer),
@ -1027,13 +1026,12 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error
func ToSealingConfig(dealmakingCfg config.DealmakingConfig, sealingCfg config.SealingConfig) sealiface.Config {
return sealiface.Config{
MaxWaitDealsSectors: sealingCfg.MaxWaitDealsSectors,
MaxSealingSectors: sealingCfg.MaxSealingSectors,
MaxSealingSectorsForDeals: sealingCfg.MaxSealingSectorsForDeals,
PreferNewSectorsForDeals: sealingCfg.PreferNewSectorsForDeals,
MinUpgradeSectorExpiration: sealingCfg.MinUpgradeSectorExpiration,
MinTargetUpgradeSectorExpiration: sealingCfg.MinTargetUpgradeSectorExpiration,
MaxUpgradingSectors: sealingCfg.MaxUpgradingSectors,
MaxWaitDealsSectors: sealingCfg.MaxWaitDealsSectors,
MaxSealingSectors: sealingCfg.MaxSealingSectors,
MaxSealingSectorsForDeals: sealingCfg.MaxSealingSectorsForDeals,
PreferNewSectorsForDeals: sealingCfg.PreferNewSectorsForDeals,
MinUpgradeSectorExpiration: sealingCfg.MinUpgradeSectorExpiration,
MaxUpgradingSectors: sealingCfg.MaxUpgradingSectors,
StartEpochSealingBuffer: abi.ChainEpoch(dealmakingCfg.StartEpochSealingBuffer),
MakeNewSectorForDeals: sealingCfg.MakeNewSectorForDeals,

View File

@ -3,6 +3,7 @@ package paths
import (
"bytes"
"encoding/json"
"io"
"net/http"
"os"
"strconv"
@ -35,7 +36,7 @@ func (d *DefaultPartialFileHandler) HasAllocated(pf *partialfile.PartialFile, of
return pf.HasAllocated(offset, size)
}
func (d *DefaultPartialFileHandler) Reader(pf *partialfile.PartialFile, offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (*os.File, error) {
func (d *DefaultPartialFileHandler) Reader(pf *partialfile.PartialFile, offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (io.Reader, error) {
return pf.Reader(offset, size)
}

View File

@ -207,8 +207,8 @@ func TestRemoteGetAllocated(t *testing.T) {
pfhandler := mocks.NewMockPartialFileHandler(mockCtrl)
handler := &paths.FetchHandler{
lstore,
pfhandler,
Local: lstore,
PfHandler: pfhandler,
}
// run http server

View File

@ -2,7 +2,7 @@ package paths
import (
"context"
"os"
"io"
"github.com/filecoin-project/go-state-types/abi"
@ -24,7 +24,7 @@ type PartialFileHandler interface {
HasAllocated(pf *partialfile.PartialFile, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error)
// Reader returns a file from which we can read the unsealed piece in the partial file.
Reader(pf *partialfile.PartialFile, offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (*os.File, error)
Reader(pf *partialfile.PartialFile, offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (io.Reader, error)
// Close closes the partial file
Close(pf *partialfile.PartialFile) error

View File

@ -5,7 +5,7 @@
package mocks
import (
os "os"
io "io"
reflect "reflect"
gomock "github.com/golang/mock/gomock"
@ -84,10 +84,10 @@ func (mr *MockPartialFileHandlerMockRecorder) OpenPartialFile(arg0, arg1 interfa
}
// Reader mocks base method.
func (m *MockPartialFileHandler) Reader(arg0 *partialfile.PartialFile, arg1 storiface.PaddedByteIndex, arg2 abi.PaddedPieceSize) (*os.File, error) {
func (m *MockPartialFileHandler) Reader(arg0 *partialfile.PartialFile, arg1 storiface.PaddedByteIndex, arg2 abi.PaddedPieceSize) (io.Reader, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Reader", arg0, arg1, arg2)
ret0, _ := ret[0].(*os.File)
ret0, _ := ret[0].(io.Reader)
ret1, _ := ret[1].(error)
return ret0, ret1
}

View File

@ -14,6 +14,7 @@ import (
"sort"
"strings"
"sync"
"time"
"github.com/hashicorp/go-multierror"
"golang.org/x/xerrors"
@ -21,6 +22,7 @@ import (
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
"github.com/filecoin-project/lotus/storage/sealer/partialfile"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
@ -28,6 +30,10 @@ var FetchTempSubdir = "fetching"
var CopyBuf = 1 << 20
// LocalReaderTimeout is the timeout for keeping local reader files open without
// any read activity.
var LocalReaderTimeout = 5 * time.Second
type Remote struct {
local Store
index SectorIndex
@ -563,7 +569,7 @@ func (r *Remote) CheckIsUnsealed(ctx context.Context, s storiface.SectorRef, off
// 1. no worker(local worker included) has an unsealed file for the given sector OR
// 2. no worker(local worker included) has the unsealed piece in their unsealed sector file.
// Will return a nil reader and a nil error in such a case.
func (r *Remote) Reader(ctx context.Context, s storiface.SectorRef, offset, size abi.PaddedPieceSize) (func(startOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error), error) {
func (r *Remote) Reader(ctx context.Context, s storiface.SectorRef, offset, size abi.PaddedPieceSize) (func(startOffsetAligned, endOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error), error) {
ft := storiface.FTUnsealed
// check if we have the unsealed sector file locally
@ -602,20 +608,67 @@ func (r *Remote) Reader(ctx context.Context, s storiface.SectorRef, offset, size
if has {
log.Infof("returning piece reader for local unsealed piece sector=%+v, (offset=%d, size=%d)", s.ID, offset, size)
return func(startOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error) {
// don't reuse between readers unless closed
f := pf
pf = nil
// refs keep track of the currently opened pf
// if they drop to 0 for longer than LocalReaderTimeout, pf will be closed
var refsLk sync.Mutex
refs := 0
if f == nil {
f, err = r.pfHandler.OpenPartialFile(abi.PaddedPieceSize(ssize), path)
if err != nil {
return nil, xerrors.Errorf("opening partial file: %w", err)
cleanupIdle := func() {
lastRefs := 1
for range time.After(LocalReaderTimeout) {
refsLk.Lock()
if refs == 0 && lastRefs == 0 && pf != nil { // pf can't really be nil here, but better be safe
log.Infow("closing idle partial file", "path", path)
err := pf.Close()
if err != nil {
log.Errorw("closing idle partial file", "path", path, "error", err)
}
pf = nil
refsLk.Unlock()
return
}
log.Debugf("local partial file (re)opened %s (+%d,%d)", path, offset, size)
lastRefs = refs
refsLk.Unlock()
}
}
getPF := func() (*partialfile.PartialFile, func() error, error) {
refsLk.Lock()
defer refsLk.Unlock()
if pf == nil {
// got closed in the meantime, reopen
var err error
pf, err = r.pfHandler.OpenPartialFile(abi.PaddedPieceSize(ssize), path)
if err != nil {
return nil, nil, xerrors.Errorf("reopening partial file: %w", err)
}
log.Debugf("local partial file reopened %s (+%d,%d)", path, offset, size)
go cleanupIdle()
}
r, err := r.pfHandler.Reader(f, storiface.PaddedByteIndex(offset)+startOffsetAligned, size-abi.PaddedPieceSize(startOffsetAligned))
refs++
return pf, func() error {
refsLk.Lock()
defer refsLk.Unlock()
refs--
return nil
}, nil
}
return func(startOffsetAligned, endOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error) {
pf, done, err := getPF()
if err != nil {
return nil, xerrors.Errorf("getting partialfile handle: %w", err)
}
r, err := r.pfHandler.Reader(pf, storiface.PaddedByteIndex(offset)+startOffsetAligned, abi.PaddedPieceSize(endOffsetAligned-startOffsetAligned))
if err != nil {
return nil, err
}
@ -625,25 +678,7 @@ func (r *Remote) Reader(ctx context.Context, s storiface.SectorRef, offset, size
io.Closer
}{
Reader: r,
Closer: funcCloser(func() error {
// if we already have a reader cached, close this one
if pf != nil {
if f == nil {
return nil
}
if pf == f {
pf = nil
}
tmp := f
f = nil
return tmp.Close()
}
// otherwise stash it away for reuse
pf = f
return nil
}),
Closer: funcCloser(done),
}, nil
}, nil
@ -689,10 +724,10 @@ func (r *Remote) Reader(ctx context.Context, s storiface.SectorRef, offset, size
continue
}
return func(startOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error) {
return func(startOffsetAligned, endOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error) {
// readRemote fetches a reader that we can use to read the unsealed piece from the remote worker.
// It uses a ranged HTTP query to ensure we ONLY read the unsealed piece and not the entire unsealed file.
rd, err := r.readRemote(ctx, url, offset+abi.PaddedPieceSize(startOffsetAligned), size)
rd, err := r.readRemote(ctx, url, offset+abi.PaddedPieceSize(startOffsetAligned), offset+abi.PaddedPieceSize(endOffsetAligned))
if err != nil {
log.Warnw("reading from remote", "url", url, "error", err)
return nil, err

View File

@ -477,7 +477,7 @@ func TestReader(t *testing.T) {
require.Nil(t, rdg)
require.Contains(t, err.Error(), tc.errStr)
} else {
rd, err = rdg(0)
rd, err = rdg(0, storiface.PaddedByteIndex(size))
require.Error(t, err)
require.Nil(t, rd)
require.Contains(t, err.Error(), tc.errStr)
@ -490,7 +490,7 @@ func TestReader(t *testing.T) {
require.Nil(t, rd)
} else {
require.NotNil(t, rdg)
rd, err := rdg(0)
rd, err := rdg(0, storiface.PaddedByteIndex(size))
require.NoError(t, err)
defer func() {

View File

@ -417,6 +417,13 @@ func (m *Sealing) addPendingPiece(ctx context.Context, size abi.UnpaddedPieceSiz
close(pp.doneCh)
}
log.Debugw("new pending piece", "dealId", deal.DealID,
"piece", deal.DealProposal.PieceCID,
"size", size,
"dealStart", deal.DealSchedule.StartEpoch,
"dealEnd", deal.DealSchedule.EndEpoch,
"termEnd", ct.claimTermEnd)
m.pendingPieces[proposalCID(deal)] = pp
go func() {
defer m.inputLk.Unlock()
@ -694,7 +701,6 @@ func (m *Sealing) maybeUpgradeSector(ctx context.Context, sp abi.RegisteredSealP
return &pieceBounds[f-1]
}
targetExpirationEpoch := ts.Height() + abi.ChainEpoch(cfg.MinTargetUpgradeSectorExpiration)
minExpirationEpoch := ts.Height() + abi.ChainEpoch(cfg.MinUpgradeSectorExpiration)
var candidate abi.SectorID
@ -732,33 +738,25 @@ func (m *Sealing) maybeUpgradeSector(ctx context.Context, sp abi.RegisteredSealP
continue
}
// if the sector has less than one sector worth of candidate deals, and
// the best candidate has more candidate deals, this sector isn't better
if pb.dealBytesInBound.Padded() < abi.PaddedPieceSize(ssize) {
if bestDealBytes > pb.dealBytesInBound.Padded() {
continue
}
}
// if best is below target, we want larger expirations
// if best is above target, we want lower pledge, but only if still above target
// todo: after nv17 "target expiration" doesn't really make that much sense
// (tho to be fair it doesn't make too much sense now either)
// we probably want the lowest expiration that's still above the configured
// minimum, and can fit most candidate deals
if bestExpiration < targetExpirationEpoch {
if expirationEpoch > bestExpiration && slowChecks(s.Number) {
bestExpiration = expirationEpoch
bestPledge = pledge
bestDealBytes = pb.dealBytesInBound.Padded()
candidate = s
}
if pb.dealBytesInBound.Padded() == 0 {
log.Debugw("skipping available sector", "sector", s.Number, "reason", "no deals in expiration bounds", "expiration", expirationEpoch)
continue
}
if expirationEpoch >= targetExpirationEpoch && pledge.LessThan(bestPledge) && slowChecks(s.Number) {
// if the sector has less than one sector worth of candidate deals, and
// the best candidate has more candidate deals, this sector isn't better
lessThanSectorOfData := pb.dealBytesInBound.Padded() < abi.PaddedPieceSize(ssize)
moreDealsThanBest := pb.dealBytesInBound.Padded() > bestDealBytes
// we want lower pledge, but only if we have more than one sector worth of deals
preferDueToDealSize := lessThanSectorOfData && moreDealsThanBest
preferDueToPledge := pledge.LessThan(bestPledge) && !lessThanSectorOfData
prefer := preferDueToDealSize || preferDueToPledge
if prefer && slowChecks(s.Number) {
bestExpiration = expirationEpoch
bestPledge = pledge
bestDealBytes = pb.dealBytesInBound.Padded()
@ -767,12 +765,12 @@ func (m *Sealing) maybeUpgradeSector(ctx context.Context, sp abi.RegisteredSealP
}
if bestExpiration < minExpirationEpoch {
log.Infow("Not upgrading any sectors", "available", len(m.available), "pieces", len(m.pendingPieces), "bestExp", bestExpiration, "target", targetExpirationEpoch, "min", minExpirationEpoch, "candidate", candidate)
log.Infow("Not upgrading any sectors", "available", len(m.available), "pieces", len(m.pendingPieces), "bestExp", bestExpiration, "min", minExpirationEpoch, "candidate", candidate)
// didn't find a good sector / no sectors were available
return false, nil
}
log.Infow("Upgrading sector", "number", candidate.Number, "type", "deal", "proofType", sp, "expiration", bestExpiration, "pledge", types.FIL(bestPledge))
log.Infow("Upgrading sector", "number", candidate.Number, "type", "deal", "proofType", sp, "expiration", bestExpiration, "pledge", types.FIL(bestPledge), "pieces", len(m.pendingPieces), "dealBytesAtExp", bestDealBytes)
delete(m.available, candidate)
m.nextDealSector = &candidate.Number
return true, m.sectors.Send(uint64(candidate.Number), SectorStartCCUpdate{})

View File

@ -22,8 +22,6 @@ type Config struct {
MinUpgradeSectorExpiration uint64
MinTargetUpgradeSectorExpiration uint64
MaxUpgradingSectors uint64
MakeNewSectorForDeals bool

View File

@ -8,6 +8,21 @@ import (
"github.com/filecoin-project/go-state-types/abi"
)
// UnpaddedFr32Chunk is the minimum amount of data which can be fr32-padded
// Fr32 padding inserts two zero bits every 254 bits, so the minimum amount of
// data which can be padded is 254 bits. 127 bytes is the smallest multiple of
// 254 bits which has a whole number of bytes.
const UnpaddedFr32Chunk abi.UnpaddedPieceSize = 127
// PaddedFr32Chunk is the size of a UnpaddedFr32Chunk chunk after fr32 padding
const PaddedFr32Chunk abi.PaddedPieceSize = 128
func init() {
if PaddedFr32Chunk != UnpaddedFr32Chunk.Padded() {
panic("bad math")
}
}
var MTTresh = uint64(512 << 10)
func mtChunkCount(usz abi.PaddedPieceSize) uint64 {

View File

@ -13,6 +13,7 @@ import (
rlepluslazy "github.com/filecoin-project/go-bitfield/rle"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/lib/readerutil"
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
@ -249,7 +250,10 @@ func (pf *PartialFile) Free(offset storiface.PaddedByteIndex, size abi.PaddedPie
return nil
}
func (pf *PartialFile) Reader(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (*os.File, error) {
// Reader forks off a new reader from the underlying file, and returns a reader
// starting at the given offset and reading the given size. Safe for concurrent
// use.
func (pf *PartialFile) Reader(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (io.Reader, error) {
if _, err := pf.file.Seek(int64(offset), io.SeekStart); err != nil {
return nil, xerrors.Errorf("seek piece start: %w", err)
}
@ -275,7 +279,7 @@ func (pf *PartialFile) Reader(offset storiface.PaddedByteIndex, size abi.PaddedP
}
}
return pf.file, nil
return io.LimitReader(readerutil.NewReadSeekerFromReaderAt(pf.file, int64(offset)), int64(size)), nil
}
func (pf *PartialFile) Allocated() (rlepluslazy.RunIterator, error) {

View File

@ -4,8 +4,10 @@ import (
"bufio"
"context"
"io"
"sync"
"github.com/ipfs/go-cid"
pool "github.com/libp2p/go-buffer-pool"
"golang.org/x/xerrors"
"github.com/filecoin-project/dagstore/mount"
@ -71,7 +73,7 @@ func (p *pieceProvider) IsUnsealed(ctx context.Context, sector storiface.SectorR
// It will NOT try to schedule an Unseal of a sealed sector file for the read.
//
// Returns a nil reader if the piece does NOT exist in any unsealed file or there is no unsealed file for the given sector on any of the workers.
func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, sector storiface.SectorRef, pieceOffset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (mount.Reader, error) {
func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, sector storiface.SectorRef, pieceOffset storiface.UnpaddedByteIndex, pieceSize abi.UnpaddedPieceSize) (mount.Reader, error) {
// acquire a lock purely for reading unsealed sectors
ctx, cancel := context.WithCancel(ctx)
if err := p.index.StorageLock(ctx, sector.ID, storiface.FTUnsealed, storiface.FTNone); err != nil {
@ -82,30 +84,37 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, se
// Reader returns a reader getter for an unsealed piece at the given offset in the given sector.
// The returned reader will be nil if none of the workers has an unsealed sector file containing
// the unsealed piece.
rg, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(pieceOffset.Padded()), size.Padded())
readerGetter, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(pieceOffset.Padded()), pieceSize.Padded())
if err != nil {
cancel()
log.Debugf("did not get storage reader;sector=%+v, err:%s", sector.ID, err)
return nil, err
}
if rg == nil {
if readerGetter == nil {
cancel()
return nil, nil
}
buf := make([]byte, fr32.BufSize(size.Padded()))
pr, err := (&pieceReader{
ctx: ctx,
getReader: func(ctx context.Context, startOffset uint64) (io.ReadCloser, error) {
startOffsetAligned := storiface.UnpaddedByteIndex(startOffset / 127 * 127) // floor to multiple of 127
getReader: func(startOffset, readSize uint64) (io.ReadCloser, error) {
// The request is for unpadded bytes, at any offset.
// storage.Reader readers give us fr32-padded bytes, so we need to
// do the unpadding here.
r, err := rg(startOffsetAligned.Padded())
startOffsetAligned := storiface.UnpaddedFloor(startOffset)
startOffsetDiff := int(startOffset - uint64(startOffsetAligned))
endOffset := startOffset + readSize
endOffsetAligned := storiface.UnpaddedCeil(endOffset)
r, err := readerGetter(startOffsetAligned.Padded(), endOffsetAligned.Padded())
if err != nil {
return nil, xerrors.Errorf("getting reader at +%d: %w", startOffsetAligned, err)
}
upr, err := fr32.NewUnpadReaderBuf(r, size.Padded(), buf)
buf := pool.Get(fr32.BufSize(pieceSize.Padded()))
upr, err := fr32.NewUnpadReaderBuf(r, pieceSize.Padded(), buf)
if err != nil {
r.Close() // nolint
return nil, xerrors.Errorf("creating unpadded reader: %w", err)
@ -113,26 +122,31 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, se
bir := bufio.NewReaderSize(upr, 127)
if startOffset > uint64(startOffsetAligned) {
if _, err := bir.Discard(int(startOffset - uint64(startOffsetAligned))); err != nil {
if _, err := bir.Discard(startOffsetDiff); err != nil {
r.Close() // nolint
return nil, xerrors.Errorf("discarding bytes for startOffset: %w", err)
}
}
var closeOnce sync.Once
return struct {
io.Reader
io.Closer
}{
Reader: bir,
Closer: funcCloser(func() error {
closeOnce.Do(func() {
pool.Put(buf)
})
return r.Close()
}),
}, nil
},
len: size,
len: pieceSize,
onClose: cancel,
pieceCid: pc,
}).init()
}).init(ctx)
if err != nil || pr == nil { // pr == nil to make sure we don't return typed nil
cancel()
return nil, err

View File

@ -6,8 +6,10 @@ import (
"io"
"sync"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/ipfs/go-cid"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"golang.org/x/xerrors"
"github.com/filecoin-project/dagstore/mount"
@ -21,29 +23,48 @@ import (
var MaxPieceReaderBurnBytes int64 = 1 << 20 // 1M
var ReadBuf = 128 * (127 * 8) // unpadded(128k)
type pieceGetter func(ctx context.Context, offset uint64) (io.ReadCloser, error)
var MinRandomReadSize = int64(4 << 10)
type pieceGetter func(offset, size uint64) (io.ReadCloser, error)
type pieceReader struct {
ctx context.Context
getReader pieceGetter
pieceCid cid.Cid
len abi.UnpaddedPieceSize
onClose context.CancelFunc
seqMCtx context.Context
atMCtx context.Context
closed bool
seqAt int64 // next byte to be read by io.Reader
mu sync.Mutex
r io.ReadCloser
br *bufio.Reader
rAt int64
// sequential reader
seqMu sync.Mutex
r io.ReadCloser
br *bufio.Reader
rAt int64
// random read cache
remReads *lru.Cache[int64, []byte] // data start offset -> data
// todo try carrying a "bytes read sequentially so far" counter with those
// cacahed byte buffers, increase buffer sizes when we see that we're doing
// a long sequential read
}
func (p *pieceReader) init() (_ *pieceReader, err error) {
stats.Record(p.ctx, metrics.DagStorePRInitCount.M(1))
func (p *pieceReader) init(ctx context.Context) (_ *pieceReader, err error) {
stats.Record(ctx, metrics.DagStorePRInitCount.M(1))
p.seqMCtx, _ = tag.New(ctx, tag.Upsert(metrics.PRReadType, "seq"))
p.atMCtx, _ = tag.New(ctx, tag.Upsert(metrics.PRReadType, "rand"))
p.remReads, err = lru.New[int64, []byte](100)
if err != nil {
return nil, err
}
p.rAt = 0
p.r, err = p.getReader(p.ctx, uint64(p.rAt))
p.r, err = p.getReader(uint64(p.rAt), uint64(p.len))
if err != nil {
return nil, err
}
@ -65,17 +86,14 @@ func (p *pieceReader) check() error {
}
func (p *pieceReader) Close() error {
p.mu.Lock()
defer p.mu.Unlock()
p.seqMu.Lock()
defer p.seqMu.Unlock()
if err := p.check(); err != nil {
return err
}
if p.r != nil {
if err := p.r.Close(); err != nil {
return err
}
if err := p.r.Close(); err != nil {
return err
}
@ -90,21 +108,21 @@ func (p *pieceReader) Close() error {
}
func (p *pieceReader) Read(b []byte) (int, error) {
p.mu.Lock()
defer p.mu.Unlock()
p.seqMu.Lock()
defer p.seqMu.Unlock()
if err := p.check(); err != nil {
return 0, err
}
n, err := p.readAtUnlocked(b, p.seqAt)
n, err := p.readSeqReader(b)
p.seqAt += int64(n)
return n, err
}
func (p *pieceReader) Seek(offset int64, whence int) (int64, error) {
p.mu.Lock()
defer p.mu.Unlock()
p.seqMu.Lock()
defer p.seqMu.Unlock()
if err := p.check(); err != nil {
return 0, err
@ -124,19 +142,14 @@ func (p *pieceReader) Seek(offset int64, whence int) (int64, error) {
return p.seqAt, nil
}
func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) {
p.mu.Lock()
defer p.mu.Unlock()
func (p *pieceReader) readSeqReader(b []byte) (n int, err error) {
off := p.seqAt
return p.readAtUnlocked(b, off)
}
func (p *pieceReader) readAtUnlocked(b []byte, off int64) (n int, err error) {
if err := p.check(); err != nil {
return 0, err
}
stats.Record(p.ctx, metrics.DagStorePRBytesRequested.M(int64(len(b))))
stats.Record(p.seqMCtx, metrics.DagStorePRBytesRequested.M(int64(len(b))))
// 1. Get the backing reader into the correct position
@ -154,13 +167,13 @@ func (p *pieceReader) readAtUnlocked(b []byte, off int64) (n int, err error) {
log.Debugw("pieceReader new stream", "piece", p.pieceCid, "at", p.rAt, "off", off-p.rAt, "n", len(b))
if off > p.rAt {
stats.Record(p.ctx, metrics.DagStorePRSeekForwardBytes.M(off-p.rAt), metrics.DagStorePRSeekForwardCount.M(1))
stats.Record(p.seqMCtx, metrics.DagStorePRSeekForwardBytes.M(off-p.rAt), metrics.DagStorePRSeekForwardCount.M(1))
} else {
stats.Record(p.ctx, metrics.DagStorePRSeekBackBytes.M(p.rAt-off), metrics.DagStorePRSeekBackCount.M(1))
stats.Record(p.seqMCtx, metrics.DagStorePRSeekBackBytes.M(p.rAt-off), metrics.DagStorePRSeekBackCount.M(1))
}
p.rAt = off
p.r, err = p.getReader(p.ctx, uint64(p.rAt))
p.r, err = p.getReader(uint64(p.rAt), uint64(p.len))
p.br = bufio.NewReaderSize(p.r, ReadBuf)
if err != nil {
return 0, xerrors.Errorf("getting backing reader: %w", err)
@ -169,7 +182,7 @@ func (p *pieceReader) readAtUnlocked(b []byte, off int64) (n int, err error) {
// 2. Check if we need to burn some bytes
if off > p.rAt {
stats.Record(p.ctx, metrics.DagStorePRBytesDiscarded.M(off-p.rAt), metrics.DagStorePRDiscardCount.M(1))
stats.Record(p.seqMCtx, metrics.DagStorePRBytesDiscarded.M(off-p.rAt), metrics.DagStorePRDiscardCount.M(1))
n, err := io.CopyN(io.Discard, p.br, off-p.rAt)
p.rAt += n
@ -196,4 +209,99 @@ func (p *pieceReader) readAtUnlocked(b []byte, off int64) (n int, err error) {
return n, err
}
func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) {
stats.Record(p.atMCtx, metrics.DagStorePRBytesRequested.M(int64(len(b))))
var filled int64
// try to get a buf from lru
data, ok := p.remReads.Get(off)
if ok {
n = copy(b, data)
filled += int64(n)
if n < len(data) {
p.remReads.Add(off+int64(n), data[n:])
// keep the header buffered
if off != 0 {
p.remReads.Remove(off)
}
}
stats.Record(p.atMCtx, metrics.DagStorePRAtHitBytes.M(int64(n)), metrics.DagStorePRAtHitCount.M(1))
// dagstore/pr_at_hit_bytes, dagstore/pr_at_hit_count
}
if filled == int64(len(b)) {
// dagstore/pr_at_cache_fill_count
stats.Record(p.atMCtx, metrics.DagStorePRAtCacheFillCount.M(1))
return n, nil
}
readOff := off + filled
readSize := int64(len(b)) - filled
smallRead := readSize < MinRandomReadSize
if smallRead {
// read into small read buf
readBuf := make([]byte, MinRandomReadSize)
bn, err := p.readInto(readBuf, readOff)
if err != nil && err != io.EOF {
return int(filled), err
}
_ = stats.RecordWithTags(p.atMCtx, []tag.Mutator{tag.Insert(metrics.PRReadSize, "small")}, metrics.DagStorePRAtReadBytes.M(int64(bn)), metrics.DagStorePRAtReadCount.M(1))
// reslice so that the slice is the data
readBuf = readBuf[:bn]
// fill user data
used := copy(b[filled:], readBuf[:])
filled += int64(used)
readBuf = readBuf[used:]
// cache the rest
if len(readBuf) > 0 {
p.remReads.Add(readOff+int64(used), readBuf)
}
} else {
// read into user buf
bn, err := p.readInto(b[filled:], readOff)
if err != nil {
return int(filled), err
}
filled += int64(bn)
_ = stats.RecordWithTags(p.atMCtx, []tag.Mutator{tag.Insert(metrics.PRReadSize, "big")}, metrics.DagStorePRAtReadBytes.M(int64(bn)), metrics.DagStorePRAtReadCount.M(1))
}
if filled < int64(len(b)) {
return int(filled), io.EOF
}
return int(filled), nil
}
func (p *pieceReader) readInto(b []byte, off int64) (n int, err error) {
rd, err := p.getReader(uint64(off), uint64(len(b)))
if err != nil {
return 0, xerrors.Errorf("getting reader: %w", err)
}
n, err = io.ReadFull(rd, b)
cerr := rd.Close()
if err == io.ErrUnexpectedEOF {
err = io.EOF
}
if err != nil {
return n, err
}
return n, cerr
}
var _ mount.Reader = (*pieceReader)(nil)

View File

@ -8,6 +8,8 @@ import (
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/storage/sealer/fr32"
)
var ErrSectorNotFound = errors.New("sector not found")
@ -26,6 +28,14 @@ func (i UnpaddedByteIndex) Valid() error {
return nil
}
func UnpaddedFloor(n uint64) UnpaddedByteIndex {
return UnpaddedByteIndex(n / uint64(fr32.UnpaddedFr32Chunk) * uint64(fr32.UnpaddedFr32Chunk))
}
func UnpaddedCeil(n uint64) UnpaddedByteIndex {
return UnpaddedByteIndex((n + uint64(fr32.UnpaddedFr32Chunk-1)) / uint64(fr32.UnpaddedFr32Chunk) * uint64(fr32.UnpaddedFr32Chunk))
}
type PaddedByteIndex uint64
type RGetter func(ctx context.Context, id abi.SectorID) (sealed cid.Cid, update bool, err error)