diff --git a/api/api_gateway.go b/api/api_gateway.go index 0fa329724..97116d345 100644 --- a/api/api_gateway.go +++ b/api/api_gateway.go @@ -33,6 +33,9 @@ import ( // * Generate openrpc blobs type Gateway interface { + MpoolPending(context.Context, types.TipSetKey) ([]*types.SignedMessage, error) + ChainGetBlock(context.Context, cid.Cid) (*types.BlockHeader, error) + MinerGetBaseInfo(context.Context, address.Address, abi.ChainEpoch, types.TipSetKey) (*MiningBaseInfo, error) StateMinerSectorCount(context.Context, address.Address, types.TipSetKey) (MinerSectors, error) GasEstimateGasPremium(context.Context, uint64, address.Address, int64, types.TipSetKey) (types.BigInt, error) StateReplay(context.Context, types.TipSetKey, cid.Cid) (*InvocResult, error) diff --git a/api/docgen/docgen.go b/api/docgen/docgen.go index 7a9993bb7..5c1d43129 100644 --- a/api/docgen/docgen.go +++ b/api/docgen/docgen.go @@ -86,6 +86,7 @@ func init() { } ExampleValues[reflect.TypeOf(addr)] = addr + ExampleValues[reflect.TypeOf(&addr)] = &addr pid, err := peer.Decode("12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf") if err != nil { diff --git a/api/proxy_gen.go b/api/proxy_gen.go index 459fd5864..f2caab266 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -632,6 +632,8 @@ type GatewayStruct struct { } type GatewayMethods struct { + ChainGetBlock func(p0 context.Context, p1 cid.Cid) (*types.BlockHeader, error) `` + ChainGetBlockMessages func(p0 context.Context, p1 cid.Cid) (*BlockMessages, error) `` ChainGetGenesis func(p0 context.Context) (*types.TipSet, error) `` @@ -734,8 +736,12 @@ type GatewayMethods struct { GasEstimateMessageGas func(p0 context.Context, p1 *types.Message, p2 *MessageSendSpec, p3 types.TipSetKey) (*types.Message, error) `` + MinerGetBaseInfo func(p0 context.Context, p1 address.Address, p2 abi.ChainEpoch, p3 types.TipSetKey) (*MiningBaseInfo, error) `` + MpoolGetNonce func(p0 context.Context, p1 address.Address) (uint64, error) `` + MpoolPending func(p0 context.Context, p1 types.TipSetKey) ([]*types.SignedMessage, error) `` + MpoolPush func(p0 context.Context, p1 *types.SignedMessage) (cid.Cid, error) `` MsigGetAvailableBalance func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (types.BigInt, error) `` @@ -4109,6 +4115,17 @@ func (s *FullNodeStub) Web3ClientVersion(p0 context.Context) (string, error) { return "", ErrNotSupported } +func (s *GatewayStruct) ChainGetBlock(p0 context.Context, p1 cid.Cid) (*types.BlockHeader, error) { + if s.Internal.ChainGetBlock == nil { + return nil, ErrNotSupported + } + return s.Internal.ChainGetBlock(p0, p1) +} + +func (s *GatewayStub) ChainGetBlock(p0 context.Context, p1 cid.Cid) (*types.BlockHeader, error) { + return nil, ErrNotSupported +} + func (s *GatewayStruct) ChainGetBlockMessages(p0 context.Context, p1 cid.Cid) (*BlockMessages, error) { if s.Internal.ChainGetBlockMessages == nil { return nil, ErrNotSupported @@ -4670,6 +4687,17 @@ func (s *GatewayStub) GasEstimateMessageGas(p0 context.Context, p1 *types.Messag return nil, ErrNotSupported } +func (s *GatewayStruct) MinerGetBaseInfo(p0 context.Context, p1 address.Address, p2 abi.ChainEpoch, p3 types.TipSetKey) (*MiningBaseInfo, error) { + if s.Internal.MinerGetBaseInfo == nil { + return nil, ErrNotSupported + } + return s.Internal.MinerGetBaseInfo(p0, p1, p2, p3) +} + +func (s *GatewayStub) MinerGetBaseInfo(p0 context.Context, p1 address.Address, p2 abi.ChainEpoch, p3 types.TipSetKey) (*MiningBaseInfo, error) { + return nil, ErrNotSupported +} + func (s *GatewayStruct) MpoolGetNonce(p0 context.Context, p1 address.Address) (uint64, error) { if s.Internal.MpoolGetNonce == nil { return 0, ErrNotSupported @@ -4681,6 +4709,17 @@ func (s *GatewayStub) MpoolGetNonce(p0 context.Context, p1 address.Address) (uin return 0, ErrNotSupported } +func (s *GatewayStruct) MpoolPending(p0 context.Context, p1 types.TipSetKey) ([]*types.SignedMessage, error) { + if s.Internal.MpoolPending == nil { + return *new([]*types.SignedMessage), ErrNotSupported + } + return s.Internal.MpoolPending(p0, p1) +} + +func (s *GatewayStub) MpoolPending(p0 context.Context, p1 types.TipSetKey) ([]*types.SignedMessage, error) { + return *new([]*types.SignedMessage), ErrNotSupported +} + func (s *GatewayStruct) MpoolPush(p0 context.Context, p1 *types.SignedMessage) (cid.Cid, error) { if s.Internal.MpoolPush == nil { return *new(cid.Cid), ErrNotSupported diff --git a/api/types.go b/api/types.go index 625b770c5..8db5120a8 100644 --- a/api/types.go +++ b/api/types.go @@ -299,6 +299,7 @@ type MinerInfo struct { SectorSize abi.SectorSize WindowPoStPartitionSectors uint64 ConsensusFaultElapsed abi.ChainEpoch + PendingOwnerAddress *address.Address Beneficiary address.Address BeneficiaryTerm *miner.BeneficiaryTerm PendingBeneficiaryTerm *miner.PendingBeneficiaryChange diff --git a/api/v0api/gateway.go b/api/v0api/gateway.go index 30eb0d1c4..9f6c54fa9 100644 --- a/api/v0api/gateway.go +++ b/api/v0api/gateway.go @@ -35,6 +35,9 @@ import ( // * Generate openrpc blobs type Gateway interface { + MpoolPending(context.Context, types.TipSetKey) ([]*types.SignedMessage, error) + ChainGetBlock(context.Context, cid.Cid) (*types.BlockHeader, error) + MinerGetBaseInfo(context.Context, address.Address, abi.ChainEpoch, types.TipSetKey) (*api.MiningBaseInfo, error) StateMinerSectorCount(context.Context, address.Address, types.TipSetKey) (api.MinerSectors, error) GasEstimateGasPremium(context.Context, uint64, address.Address, int64, types.TipSetKey) (types.BigInt, error) StateReplay(context.Context, types.TipSetKey, cid.Cid) (*api.InvocResult, error) diff --git a/api/v0api/proxy_gen.go b/api/v0api/proxy_gen.go index 069527ae8..29f6f6773 100644 --- a/api/v0api/proxy_gen.go +++ b/api/v0api/proxy_gen.go @@ -431,6 +431,8 @@ type GatewayStruct struct { } type GatewayMethods struct { + ChainGetBlock func(p0 context.Context, p1 cid.Cid) (*types.BlockHeader, error) `` + ChainGetBlockMessages func(p0 context.Context, p1 cid.Cid) (*api.BlockMessages, error) `` ChainGetMessage func(p0 context.Context, p1 cid.Cid) (*types.Message, error) `` @@ -453,8 +455,12 @@ type GatewayMethods struct { GasEstimateMessageGas func(p0 context.Context, p1 *types.Message, p2 *api.MessageSendSpec, p3 types.TipSetKey) (*types.Message, error) `` + MinerGetBaseInfo func(p0 context.Context, p1 address.Address, p2 abi.ChainEpoch, p3 types.TipSetKey) (*api.MiningBaseInfo, error) `` + MpoolGetNonce func(p0 context.Context, p1 address.Address) (uint64, error) `` + MpoolPending func(p0 context.Context, p1 types.TipSetKey) ([]*types.SignedMessage, error) `` + MpoolPush func(p0 context.Context, p1 *types.SignedMessage) (cid.Cid, error) `` MsigGetAvailableBalance func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (types.BigInt, error) `` @@ -2581,6 +2587,17 @@ func (s *FullNodeStub) WalletVerify(p0 context.Context, p1 address.Address, p2 [ return false, ErrNotSupported } +func (s *GatewayStruct) ChainGetBlock(p0 context.Context, p1 cid.Cid) (*types.BlockHeader, error) { + if s.Internal.ChainGetBlock == nil { + return nil, ErrNotSupported + } + return s.Internal.ChainGetBlock(p0, p1) +} + +func (s *GatewayStub) ChainGetBlock(p0 context.Context, p1 cid.Cid) (*types.BlockHeader, error) { + return nil, ErrNotSupported +} + func (s *GatewayStruct) ChainGetBlockMessages(p0 context.Context, p1 cid.Cid) (*api.BlockMessages, error) { if s.Internal.ChainGetBlockMessages == nil { return nil, ErrNotSupported @@ -2702,6 +2719,17 @@ func (s *GatewayStub) GasEstimateMessageGas(p0 context.Context, p1 *types.Messag return nil, ErrNotSupported } +func (s *GatewayStruct) MinerGetBaseInfo(p0 context.Context, p1 address.Address, p2 abi.ChainEpoch, p3 types.TipSetKey) (*api.MiningBaseInfo, error) { + if s.Internal.MinerGetBaseInfo == nil { + return nil, ErrNotSupported + } + return s.Internal.MinerGetBaseInfo(p0, p1, p2, p3) +} + +func (s *GatewayStub) MinerGetBaseInfo(p0 context.Context, p1 address.Address, p2 abi.ChainEpoch, p3 types.TipSetKey) (*api.MiningBaseInfo, error) { + return nil, ErrNotSupported +} + func (s *GatewayStruct) MpoolGetNonce(p0 context.Context, p1 address.Address) (uint64, error) { if s.Internal.MpoolGetNonce == nil { return 0, ErrNotSupported @@ -2713,6 +2741,17 @@ func (s *GatewayStub) MpoolGetNonce(p0 context.Context, p1 address.Address) (uin return 0, ErrNotSupported } +func (s *GatewayStruct) MpoolPending(p0 context.Context, p1 types.TipSetKey) ([]*types.SignedMessage, error) { + if s.Internal.MpoolPending == nil { + return *new([]*types.SignedMessage), ErrNotSupported + } + return s.Internal.MpoolPending(p0, p1) +} + +func (s *GatewayStub) MpoolPending(p0 context.Context, p1 types.TipSetKey) ([]*types.SignedMessage, error) { + return *new([]*types.SignedMessage), ErrNotSupported +} + func (s *GatewayStruct) MpoolPush(p0 context.Context, p1 *types.SignedMessage) (cid.Cid, error) { if s.Internal.MpoolPush == nil { return *new(cid.Cid), ErrNotSupported diff --git a/build/openrpc/full.json.gz b/build/openrpc/full.json.gz index 8e12e7ed3..57f88c609 100644 Binary files a/build/openrpc/full.json.gz and b/build/openrpc/full.json.gz differ diff --git a/build/openrpc/gateway.json.gz b/build/openrpc/gateway.json.gz index 2fa38e648..abf3dc973 100644 Binary files a/build/openrpc/gateway.json.gz and b/build/openrpc/gateway.json.gz differ diff --git a/build/openrpc/miner.json.gz b/build/openrpc/miner.json.gz index 164434f3b..d403bd185 100644 Binary files a/build/openrpc/miner.json.gz and b/build/openrpc/miner.json.gz differ diff --git a/build/openrpc/worker.json.gz b/build/openrpc/worker.json.gz index f56329b05..0cec0db81 100644 Binary files a/build/openrpc/worker.json.gz and b/build/openrpc/worker.json.gz differ diff --git a/chain/actors/policy/policy.go b/chain/actors/policy/policy.go index 4b90c46a0..bf982af89 100644 --- a/chain/actors/policy/policy.go +++ b/chain/actors/policy/policy.go @@ -61,6 +61,10 @@ const ( MaxPreCommitRandomnessLookback = builtin11.EpochsInDay + SealRandomnessLookback ) +var ( + MarketDefaultAllocationTermBuffer = market11.MarketDefaultAllocationTermBuffer +) + // SetSupportedProofTypes sets supported proof types, across all actor versions. // This should only be used for testing. func SetSupportedProofTypes(types ...abi.RegisteredSealProof) { diff --git a/chain/actors/policy/policy.go.template b/chain/actors/policy/policy.go.template index f5178500a..3eb39836a 100644 --- a/chain/actors/policy/policy.go.template +++ b/chain/actors/policy/policy.go.template @@ -39,6 +39,10 @@ const ( MaxPreCommitRandomnessLookback = builtin{{.latestVersion}}.EpochsInDay + SealRandomnessLookback ) +var ( + MarketDefaultAllocationTermBuffer = market{{.latestVersion}}.MarketDefaultAllocationTermBuffer +) + // SetSupportedProofTypes sets supported proof types, across all actor versions. // This should only be used for testing. func SetSupportedProofTypes(types ...abi.RegisteredSealProof) { diff --git a/chain/gen/slashfilter/slashfilter.go b/chain/gen/slashfilter/slashfilter.go index e23c601df..89ddd90b1 100644 --- a/chain/gen/slashfilter/slashfilter.go +++ b/chain/gen/slashfilter/slashfilter.go @@ -11,7 +11,6 @@ import ( "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/types" ) @@ -28,10 +27,6 @@ func New(dstore ds.Batching) *SlashFilter { } func (f *SlashFilter) CheckBlock(ctx context.Context, bh *types.BlockHeader, parentEpoch abi.ChainEpoch) (cid.Cid, error) { - if build.IsNearUpgrade(bh.Height, build.UpgradeOrangeHeight) { - return cid.Undef, nil - } - epochKey := ds.NewKey(fmt.Sprintf("/%s/%d", bh.Miner, bh.Height)) { // double-fork mining (2 blocks at one epoch) diff --git a/chain/messagepool/messagepool_test.go b/chain/messagepool/messagepool_test.go index a781b5074..3c6800d7b 100644 --- a/chain/messagepool/messagepool_test.go +++ b/chain/messagepool/messagepool_test.go @@ -11,9 +11,11 @@ import ( "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log/v2" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" + big2 "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/network" builtin2 "github.com/filecoin-project/specs-actors/v2/actors/builtin" @@ -524,6 +526,36 @@ func TestPruningSimple(t *testing.T) { } } +func TestGasRewardNegative(t *testing.T) { + var mp MessagePool + + msg := types.SignedMessage{ + Message: types.Message{ + GasLimit: 1000, + GasFeeCap: big2.NewInt(20000), + GasPremium: big2.NewInt(15000), + }, + } + baseFee := big2.NewInt(30000) + // Over the GasPremium, but under the BaseFee + gr1 := mp.getGasReward(&msg, baseFee) + + msg.Message.GasFeeCap = big2.NewInt(15000) + // Equal to GasPremium, under the BaseFee + gr2 := mp.getGasReward(&msg, baseFee) + + msg.Message.GasFeeCap = big2.NewInt(10000) + // Under both GasPremium and BaseFee + gr3 := mp.getGasReward(&msg, baseFee) + + require.True(t, gr1.Sign() < 0) + require.True(t, gr2.Sign() < 0) + require.True(t, gr3.Sign() < 0) + + require.True(t, gr1.Cmp(gr2) > 0) + require.True(t, gr2.Cmp(gr3) > 0) +} + func TestLoadLocal(t *testing.T) { tma := newTestMpoolAPI() ds := datastore.NewMapDatastore() diff --git a/chain/messagepool/selection_test.go b/chain/messagepool/selection_test.go index c3a5c6d6f..17e0f34f4 100644 --- a/chain/messagepool/selection_test.go +++ b/chain/messagepool/selection_test.go @@ -13,6 +13,7 @@ import ( "os" "sort" "testing" + "time" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" @@ -1690,3 +1691,188 @@ readLoop: } } + +func TestRealWorldSelectionTiming(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @TOKEN_WALLET_SIGN_001, @CHAIN_MEMPOOL_SELECT_001 + + // load test-messages.json.gz and rewrite the messages so that + // 1) we map each real actor to a test actor so that we can sign the messages + // 2) adjust the nonces so that they start from 0 + file, err := os.Open("test-messages2.json.gz") + if err != nil { + t.Fatal(err) + } + + gzr, err := gzip.NewReader(file) + if err != nil { + t.Fatal(err) + } + + dec := json.NewDecoder(gzr) + + var msgs []*types.SignedMessage + baseNonces := make(map[address.Address]uint64) + +readLoop: + for { + m := new(types.SignedMessage) + err := dec.Decode(m) + switch err { + case nil: + msgs = append(msgs, m) + nonce, ok := baseNonces[m.Message.From] + if !ok || m.Message.Nonce < nonce { + baseNonces[m.Message.From] = m.Message.Nonce + } + + case io.EOF: + break readLoop + + default: + t.Fatal(err) + } + } + + actorMap := make(map[address.Address]address.Address) + actorWallets := make(map[address.Address]api.Wallet) + + for _, m := range msgs { + baseNonce := baseNonces[m.Message.From] + + localActor, ok := actorMap[m.Message.From] + if !ok { + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a, err := w.WalletNew(context.Background(), types.KTSecp256k1) + if err != nil { + t.Fatal(err) + } + + actorMap[m.Message.From] = a + actorWallets[a] = w + localActor = a + } + + w, ok := actorWallets[localActor] + if !ok { + t.Fatalf("failed to lookup wallet for actor %s", localActor) + } + + m.Message.From = localActor + m.Message.Nonce -= baseNonce + + sig, err := w.WalletSign(context.TODO(), localActor, m.Message.Cid().Bytes(), api.MsgMeta{}) + if err != nil { + t.Fatal(err) + } + + m.Signature = *sig + } + + mp, tma := makeTestMpool() + + block := tma.nextBlockWithHeight(uint64(build.UpgradeHyggeHeight) + 10) + ts := mock.TipSet(block) + tma.applyBlock(t, block) + + for _, a := range actorMap { + tma.setBalance(a, 1000000) + } + + tma.baseFee = types.NewInt(800_000_000) + + sort.Slice(msgs, func(i, j int) bool { + return msgs[i].Message.Nonce < msgs[j].Message.Nonce + }) + + // add the messages + for _, m := range msgs { + mustAdd(t, mp, m) + } + + // do message selection and check block packing + minGasLimit := int64(0.9 * float64(build.BlockGasLimit)) + + // greedy first + start := time.Now() + selected, err := mp.SelectMessages(context.Background(), ts, 1.0) + if err != nil { + t.Fatal(err) + } + t.Logf("selected %d messages in %s", len(selected), time.Since(start)) + + gasLimit := int64(0) + for _, m := range selected { + gasLimit += m.Message.GasLimit + } + if gasLimit < minGasLimit { + t.Fatalf("failed to pack with tq=1.0; packed %d, minimum packing: %d", gasLimit, minGasLimit) + } + + // high quality ticket + start = time.Now() + selected, err = mp.SelectMessages(context.Background(), ts, .8) + if err != nil { + t.Fatal(err) + } + t.Logf("selected %d messages in %s", len(selected), time.Since(start)) + + gasLimit = int64(0) + for _, m := range selected { + gasLimit += m.Message.GasLimit + } + if gasLimit < minGasLimit { + t.Fatalf("failed to pack with tq=0.8; packed %d, minimum packing: %d", gasLimit, minGasLimit) + } + + // mid quality ticket + start = time.Now() + selected, err = mp.SelectMessages(context.Background(), ts, .4) + if err != nil { + t.Fatal(err) + } + t.Logf("selected %d messages in %s", len(selected), time.Since(start)) + + gasLimit = int64(0) + for _, m := range selected { + gasLimit += m.Message.GasLimit + } + if gasLimit < minGasLimit { + t.Fatalf("failed to pack with tq=0.4; packed %d, minimum packing: %d", gasLimit, minGasLimit) + } + + // low quality ticket + start = time.Now() + selected, err = mp.SelectMessages(context.Background(), ts, .1) + if err != nil { + t.Fatal(err) + } + t.Logf("selected %d messages in %s", len(selected), time.Since(start)) + + gasLimit = int64(0) + for _, m := range selected { + gasLimit += m.Message.GasLimit + } + if gasLimit < minGasLimit { + t.Fatalf("failed to pack with tq=0.1; packed %d, minimum packing: %d", gasLimit, minGasLimit) + } + + // very low quality ticket + start = time.Now() + selected, err = mp.SelectMessages(context.Background(), ts, .01) + if err != nil { + t.Fatal(err) + } + t.Logf("selected %d messages in %s", len(selected), time.Since(start)) + + gasLimit = int64(0) + for _, m := range selected { + gasLimit += m.Message.GasLimit + } + if gasLimit < minGasLimit { + t.Fatalf("failed to pack with tq=0.01; packed %d, minimum packing: %d", gasLimit, minGasLimit) + } +} diff --git a/chain/messagepool/test-messages2.json.gz b/chain/messagepool/test-messages2.json.gz new file mode 100644 index 000000000..9d2cfbfe4 Binary files /dev/null and b/chain/messagepool/test-messages2.json.gz differ diff --git a/cmd/lotus-bench/main.go b/cmd/lotus-bench/main.go index 12d310b65..883f27a42 100644 --- a/cmd/lotus-bench/main.go +++ b/cmd/lotus-bench/main.go @@ -106,6 +106,7 @@ func main() { sealBenchCmd, simpleCmd, importBenchCmd, + rpcCmd, }, } diff --git a/cmd/lotus-bench/rpc.go b/cmd/lotus-bench/rpc.go new file mode 100644 index 000000000..5da784c6e --- /dev/null +++ b/cmd/lotus-bench/rpc.go @@ -0,0 +1,576 @@ +package main + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "os" + "os/signal" + "sort" + "strconv" + "strings" + "sync" + "text/tabwriter" + "time" + + "github.com/urfave/cli/v2" +) + +var rpcCmd = &cli.Command{ + Name: "rpc", + Usage: "Runs a concurrent stress test on one or more rpc methods and prints the performance metrics including latency distribution and histogram", + Description: `This benchmark is designed to stress test the rpc methods of a lotus node so that we can simulate real world usage and measure the performance of rpc methods on the node. + +This benchmark has the following features: +* Can query each method both sequentially and concurrently +* Supports rate limiting +* Can query multiple different endpoints at once (supporting different concurrency level and rate limiting for each method) +* Gives a nice reporting summary of the stress testing of each method (including latency distribution, histogram and more) +* Easy to use + +To use this benchmark you must specify the rpc methods you want to test using the --method options, the format of it is: + + --method=NAME[:CONCURRENCY][:QPS][:PARAMS] where only NAME is required. + +Here are some real examples: + lotus-bench rpc --method='eth_chainId' // run eth_chainId with default concurrency and qps + lotus-bench rpc --method='eth_chainId:3' // override concurrency to 3 + lotus-bench rpc --method='eth_chainId::100' // override to 100 qps while using default concurrency + lotus-bench rpc --method='eth_chainId:3:100' // run using 3 workers but limit to 100 qps + lotus-bench rpc --method='eth_getTransactionCount:::["0xd4c70007F3F502f212c7e6794b94C06F36173B36", "latest"]' // run using optional params while using default concurrency and qps + lotus-bench rpc --method='eth_chainId' --method='eth_getTransactionCount:10:0:["0xd4c70007F3F502f212c7e6794b94C06F36173B36", "latest"]' // run multiple methods at once + +NOTE: The last two examples will not work until we upgrade urfave dependency (tracked in https://github.com/urfave/cli/issues/1734)`, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "endpoint", + Value: "http://127.0.0.1:1234/rpc/v1", + Usage: "The rpc endpoint to benchmark", + }, + &cli.DurationFlag{ + Name: "duration", + Value: 60 * time.Second, + Usage: "Duration of benchmark in seconds", + }, + &cli.IntFlag{ + Name: "concurrency", + Value: 10, + Usage: "How many workers should be used per rpc method (can be overridden per method)", + }, + &cli.IntFlag{ + Name: "qps", + Value: 0, + Usage: "How many requests per second should be sent per rpc method (can be overridden per method), a value of 0 means no limit", + }, + &cli.StringSliceFlag{ + Name: "method", + Usage: `Method to benchmark, you can specify multiple methods by repeating this flag. You can also specify method specific options to set the concurrency and qps for each method (see usage). +`, + }, + &cli.DurationFlag{ + Name: "watch", + Value: 0 * time.Second, + Usage: "If >0 then generates reports every N seconds (only supports linux/unix)", + }, + &cli.BoolFlag{ + Name: "print-response", + Value: false, + Usage: "print the response of each request", + }, + }, + Action: func(cctx *cli.Context) error { + if len(cctx.StringSlice("method")) == 0 { + return errors.New("you must specify and least one method to benchmark") + } + + var rpcMethods []*RPCMethod + for _, str := range cctx.StringSlice("method") { + entries := strings.SplitN(str, ":", 4) + if len(entries) == 0 { + return errors.New("invalid method format") + } + + // check if concurrency was specified + concurrency := cctx.Int("concurrency") + if len(entries) > 1 { + if len(entries[1]) > 0 { + var err error + concurrency, err = strconv.Atoi(entries[1]) + if err != nil { + return fmt.Errorf("could not parse concurrency value from method %s: %v", entries[0], err) + } + } + } + + // check if qps was specified + qps := cctx.Int("qps") + if len(entries) > 2 { + if len(entries[2]) > 0 { + var err error + qps, err = strconv.Atoi(entries[2]) + if err != nil { + return fmt.Errorf("could not parse qps value from method %s: %v", entries[0], err) + } + } + } + + // check if params was specified + params := "[]" + if len(entries) > 3 { + params = entries[3] + } + + rpcMethods = append(rpcMethods, &RPCMethod{ + w: os.Stdout, + uri: cctx.String("endpoint"), + method: entries[0], + concurrency: concurrency, + qps: qps, + params: params, + printResp: cctx.Bool("print-response"), + }) + } + + // terminate early on ctrl+c + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + go func() { + <-c + fmt.Println("Received interrupt, stopping...") + for _, method := range rpcMethods { + method.Stop() + } + }() + + // stop all threads after duration + go func() { + time.Sleep(cctx.Duration("duration")) + for _, e := range rpcMethods { + e.Stop() + } + }() + + // start all threads + var wg sync.WaitGroup + wg.Add(len(rpcMethods)) + + for _, e := range rpcMethods { + go func(e *RPCMethod) { + defer wg.Done() + err := e.Run() + if err != nil { + fmt.Printf("error running rpc method: %v\n", err) + } + }(e) + } + + // if watch is set then print a report every N seconds + var progressCh chan struct{} + if cctx.Duration("watch") > 0 { + progressCh = make(chan struct{}, 1) + go func(progressCh chan struct{}) { + ticker := time.NewTicker(cctx.Duration("watch")) + for { + clearAndPrintReport := func() { + // clear the screen move the curser to the top left + fmt.Print("\033[2J") + fmt.Printf("\033[%d;%dH", 1, 1) + for i, e := range rpcMethods { + e.Report() + if i < len(rpcMethods)-1 { + fmt.Println() + } + } + } + select { + case <-ticker.C: + clearAndPrintReport() + case <-progressCh: + clearAndPrintReport() + return + } + } + }(progressCh) + } + + wg.Wait() + + if progressCh != nil { + // wait for the watch go routine to return + progressCh <- struct{}{} + + // no need to print the report again + return nil + } + + // print the report for each endpoint + for i, e := range rpcMethods { + e.Report() + if i < len(rpcMethods)-1 { + fmt.Println() + } + } + + return nil + }, +} + +// RPCMethod handles the benchmarking of a single endpoint method. +type RPCMethod struct { + w io.Writer + // the endpoint uri + uri string + // the rpc method we want to benchmark + method string + // the number of concurrent requests to make to this endpoint + concurrency int + // if >0 then limit to qps is the max number of requests per second to make to this endpoint (0 = no limit) + qps int + // many endpoints require specific parameters to be passed + params string + // whether or not to print the response of each request (useful for debugging) + printResp bool + // instruct the worker go routines to stop + stopCh chan struct{} + // when the endpoint bencharking started + start time.Time + // results channel is used by the workers to send results to the reporter + results chan *result + // reporter handles reading the results from workers and printing the report statistics + reporter *Reporter +} + +// result is the result of a single rpc method request. +type result struct { + err error + statusCode *int + duration time.Duration +} + +func (rpc *RPCMethod) Run() error { + client := &http.Client{ + Timeout: 0, + } + + var wg sync.WaitGroup + wg.Add(rpc.concurrency) + + rpc.results = make(chan *result, rpc.concurrency*1_000) + rpc.stopCh = make(chan struct{}, rpc.concurrency) + + go func() { + rpc.reporter = NewReporter(rpc.results, rpc.w) + rpc.reporter.Run() + }() + + rpc.start = time.Now() + + // throttle the number of requests per second + var qpsTicker *time.Ticker + if rpc.qps > 0 { + qpsTicker = time.NewTicker(time.Second / time.Duration(rpc.qps)) + } + + for i := 0; i < rpc.concurrency; i++ { + go func() { + rpc.startWorker(client, qpsTicker) + wg.Done() + }() + } + wg.Wait() + + // close the results channel so reporter will stop + close(rpc.results) + + // wait until the reporter is done + <-rpc.reporter.doneCh + + return nil +} + +func (rpc *RPCMethod) startWorker(client *http.Client, qpsTicker *time.Ticker) { + for { + // check if we should stop + select { + case <-rpc.stopCh: + return + default: + } + + // wait for the next tick if we are rate limiting this endpoint + if qpsTicker != nil { + <-qpsTicker.C + } + + req, err := rpc.buildRequest() + if err != nil { + log.Fatalln(err) + } + + start := time.Now() + + var statusCode *int + + // send request the endpoint + resp, err := client.Do(req) + if err != nil { + err = fmt.Errorf("HTTP error: %s", err.Error()) + } else { + statusCode = &resp.StatusCode + + // there was not a HTTP error but we need to still check the json response for errrors + var data []byte + data, err = io.ReadAll(resp.Body) + if err != nil { + log.Fatalln(err) + } + + // we are only interested if it has the error field in the response + type respData struct { + Error struct { + Code int `json:"code"` + Message string `json:"message"` + } `json:"error"` + } + + // unmarshal the response into a struct so we can check for errors + var d respData + err = json.Unmarshal(data, &d) + if err != nil { + log.Fatalln(err) + } + + // if the response has an error json message then it should be considered an error just like any http error + if len(d.Error.Message) > 0 { + // truncate the error message if it is too long + if len(d.Error.Message) > 1000 { + d.Error.Message = d.Error.Message[:1000] + "..." + } + // remove newlines from the error message so we don't screw up the report + d.Error.Message = strings.ReplaceAll(d.Error.Message, "\n", "") + + err = fmt.Errorf("JSON error: code:%d, message:%s", d.Error.Code, d.Error.Message) + } + + if rpc.printResp { + fmt.Printf("[%s] %s", rpc.method, string(data)) + } + + resp.Body.Close() //nolint:errcheck + } + + rpc.results <- &result{ + statusCode: statusCode, + err: err, + duration: time.Since(start), + } + } +} + +func (rpc *RPCMethod) buildRequest() (*http.Request, error) { + jreq, err := json.Marshal(struct { + Jsonrpc string `json:"jsonrpc"` + ID int `json:"id"` + Method string `json:"method"` + Params json.RawMessage `json:"params"` + }{ + Jsonrpc: "2.0", + Method: rpc.method, + Params: json.RawMessage(rpc.params), + ID: 0, + }) + if err != nil { + return nil, err + } + + req, err := http.NewRequest("POST", rpc.uri, bytes.NewReader(jreq)) + if err != nil { + return nil, err + } + + req.Header.Set("Accept", "application/json") + + return req, nil +} + +func (rpc *RPCMethod) Stop() { + for i := 0; i < rpc.concurrency; i++ { + rpc.stopCh <- struct{}{} + } +} + +func (rpc *RPCMethod) Report() { + total := time.Since(rpc.start) + fmt.Fprintf(rpc.w, "[%s]:\n", rpc.method) + fmt.Fprintf(rpc.w, "- Options:\n") + fmt.Fprintf(rpc.w, " - concurrency: %d\n", rpc.concurrency) + fmt.Fprintf(rpc.w, " - params: %s\n", rpc.params) + fmt.Fprintf(rpc.w, " - qps: %d\n", rpc.qps) + rpc.reporter.Print(total, rpc.w) +} + +// Reporter reads the results from the workers through the results channel and aggregates the results. +type Reporter struct { + // write the report to this writer + w io.Writer + // the reporter read the results from this channel + results chan *result + // doneCh is used to signal that the reporter has finished reading the results (channel has closed) + doneCh chan bool + + // lock protect the following fields during critical sections (if --watch was specified) + lock sync.Mutex + // the latencies of all requests + latencies []int64 + // the number of requests that returned each status code + statusCodes map[int]int + // the number of errors that occurred + errors map[string]int +} + +func NewReporter(results chan *result, w io.Writer) *Reporter { + return &Reporter{ + w: w, + results: results, + doneCh: make(chan bool, 1), + statusCodes: make(map[int]int), + errors: make(map[string]int), + } +} + +func (r *Reporter) Run() { + for res := range r.results { + r.lock.Lock() + + r.latencies = append(r.latencies, res.duration.Milliseconds()) + + if res.statusCode != nil { + r.statusCodes[*res.statusCode]++ + } + + if res.err != nil { + if len(r.errors) < 1_000_000 { + r.errors[res.err.Error()]++ + } else { + // we don't want to store too many errors in memory + r.errors["hidden"]++ + } + } else { + r.errors["nil"]++ + } + + r.lock.Unlock() + } + + r.doneCh <- true +} + +func (r *Reporter) Print(elapsed time.Duration, w io.Writer) { + r.lock.Lock() + defer r.lock.Unlock() + + nrReq := int64(len(r.latencies)) + if nrReq == 0 { + fmt.Println("No requests were made") + return + } + + // we need to sort the latencies slice to calculate the percentiles + sort.Slice(r.latencies, func(i, j int) bool { + return r.latencies[i] < r.latencies[j] + }) + + var totalLatency int64 = 0 + for _, latency := range r.latencies { + totalLatency += latency + } + + fmt.Fprintf(w, "- Total Requests: %d\n", nrReq) + fmt.Fprintf(w, "- Total Duration: %dms\n", elapsed.Milliseconds()) + fmt.Fprintf(w, "- Requests/sec: %f\n", float64(nrReq)/elapsed.Seconds()) + fmt.Fprintf(w, "- Avg latency: %dms\n", totalLatency/nrReq) + fmt.Fprintf(w, "- Median latency: %dms\n", r.latencies[nrReq/2]) + fmt.Fprintf(w, "- Latency distribution:\n") + percentiles := []float64{0.1, 0.5, 0.9, 0.95, 0.99, 0.999} + for _, p := range percentiles { + idx := int64(p * float64(nrReq)) + fmt.Fprintf(w, " %s%% in %dms\n", fmt.Sprintf("%.2f", p*100.0), r.latencies[idx]) + } + + // create a simple histogram with 10 buckets spanning the range of latency + // into equal ranges + // + nrBucket := 10 + buckets := make([]Bucket, nrBucket) + latencyRange := r.latencies[len(r.latencies)-1] + bucketRange := latencyRange / int64(nrBucket) + + // mark the end of each bucket + for i := 0; i < nrBucket; i++ { + buckets[i].start = int64(i) * bucketRange + buckets[i].end = buckets[i].start + bucketRange + // extend the last bucked by any remaning range caused by the integer division + if i == nrBucket-1 { + buckets[i].end = latencyRange + } + } + + // count the number of requests in each bucket + currBucket := 0 + for i := 0; i < len(r.latencies); { + if r.latencies[i] <= buckets[currBucket].end { + buckets[currBucket].cnt++ + i++ + } else { + currBucket++ + } + } + + // print the histogram using a tabwriter which will align the columns nicely + fmt.Fprintf(w, "- Histogram:\n") + const padding = 2 + tabWriter := tabwriter.NewWriter(w, 0, 0, padding, ' ', tabwriter.AlignRight|tabwriter.Debug) + for i := 0; i < nrBucket; i++ { + ratio := float64(buckets[i].cnt) / float64(nrReq) + bars := strings.Repeat("#", int(ratio*100)) + fmt.Fprintf(tabWriter, " %d-%dms\t%d\t%s (%s%%)\n", buckets[i].start, buckets[i].end, buckets[i].cnt, bars, fmt.Sprintf("%.2f", ratio*100)) + } + tabWriter.Flush() //nolint:errcheck + + fmt.Fprintf(w, "- Status codes:\n") + for code, cnt := range r.statusCodes { + fmt.Fprintf(w, " [%d]: %d\n", code, cnt) + } + + // print the 10 most occurring errors (in case error values are not unique) + // + type kv struct { + err string + cnt int + } + var sortedErrors []kv + for err, cnt := range r.errors { + sortedErrors = append(sortedErrors, kv{err, cnt}) + } + sort.Slice(sortedErrors, func(i, j int) bool { + return sortedErrors[i].cnt > sortedErrors[j].cnt + }) + fmt.Fprintf(w, "- Errors (top 10):\n") + for i, se := range sortedErrors { + if i > 10 { + break + } + fmt.Fprintf(w, " [%s]: %d\n", se.err, se.cnt) + } +} + +type Bucket struct { + start int64 + // the end value of the bucket + end int64 + // how many entries are in the bucket + cnt int +} diff --git a/cmd/lotus-miner/sectors.go b/cmd/lotus-miner/sectors.go index 968e92174..3993ecf75 100644 --- a/cmd/lotus-miner/sectors.go +++ b/cmd/lotus-miner/sectors.go @@ -2,6 +2,7 @@ package main import ( "bufio" + "encoding/csv" "encoding/json" "errors" "fmt" @@ -319,6 +320,9 @@ var sectorsListCmd = &cli.Command{ Value: parallelSectorChecks, }, }, + Subcommands: []*cli.Command{ + sectorsListUpgradeBoundsCmd, + }, Action: func(cctx *cli.Context) error { // http mode allows for parallel json decoding/encoding, which was a bottleneck here minerApi, closer, err := lcli.GetStorageMinerAPI(cctx, cliutil.StorageMinerUseHttp) @@ -586,6 +590,169 @@ var sectorsListCmd = &cli.Command{ }, } +var sectorsListUpgradeBoundsCmd = &cli.Command{ + Name: "upgrade-bounds", + Usage: "Output upgrade bounds for available sectors", + Flags: []cli.Flag{ + &cli.IntFlag{ + Name: "buckets", + Value: 25, + }, + &cli.BoolFlag{ + Name: "csv", + Usage: "output machine-readable values", + }, + &cli.BoolFlag{ + Name: "deal-terms", + Usage: "bucket by how many deal-sectors can start at a given expiration", + }, + }, + Action: func(cctx *cli.Context) error { + minerApi, closer, err := lcli.GetStorageMinerAPI(cctx, cliutil.StorageMinerUseHttp) + if err != nil { + return err + } + defer closer() + + fullApi, closer2, err := lcli.GetFullNodeAPI(cctx) + if err != nil { + return err + } + defer closer2() + + ctx := lcli.ReqContext(cctx) + + list, err := minerApi.SectorsListInStates(ctx, []api.SectorState{ + api.SectorState(sealing.Available), + }) + if err != nil { + return xerrors.Errorf("getting sector list: %w", err) + } + + head, err := fullApi.ChainHead(ctx) + if err != nil { + return xerrors.Errorf("getting chain head: %w", err) + } + + filter := bitfield.New() + + for _, s := range list { + filter.Set(uint64(s)) + } + + maddr, err := minerApi.ActorAddress(ctx) + if err != nil { + return err + } + + sset, err := fullApi.StateMinerSectors(ctx, maddr, &filter, head.Key()) + if err != nil { + return err + } + + if len(sset) == 0 { + return nil + } + + var minExpiration, maxExpiration abi.ChainEpoch + + for _, s := range sset { + if s.Expiration < minExpiration || minExpiration == 0 { + minExpiration = s.Expiration + } + if s.Expiration > maxExpiration { + maxExpiration = s.Expiration + } + } + + buckets := cctx.Int("buckets") + bucketSize := (maxExpiration - minExpiration) / abi.ChainEpoch(buckets) + bucketCounts := make([]int, buckets+1) + + for b := range bucketCounts { + bucketMin := minExpiration + abi.ChainEpoch(b)*bucketSize + bucketMax := minExpiration + abi.ChainEpoch(b+1)*bucketSize + + if cctx.Bool("deal-terms") { + bucketMax = bucketMax + policy.MarketDefaultAllocationTermBuffer + } + + for _, s := range sset { + isInBucket := s.Expiration >= bucketMin && s.Expiration < bucketMax + + if isInBucket { + bucketCounts[b]++ + } + } + + } + + // Creating CSV writer + writer := csv.NewWriter(os.Stdout) + + // Writing CSV headers + err = writer.Write([]string{"Max Expiration in Bucket", "Sector Count"}) + if err != nil { + return xerrors.Errorf("writing csv headers: %w", err) + } + + // Writing bucket details + + if cctx.Bool("csv") { + for i := 0; i < buckets; i++ { + maxExp := minExpiration + abi.ChainEpoch(i+1)*bucketSize + + timeStr := strconv.FormatInt(int64(maxExp), 10) + + err = writer.Write([]string{ + timeStr, + strconv.Itoa(bucketCounts[i]), + }) + if err != nil { + return xerrors.Errorf("writing csv row: %w", err) + } + } + + // Flush to make sure all data is written to the underlying writer + writer.Flush() + + if err := writer.Error(); err != nil { + return xerrors.Errorf("flushing csv writer: %w", err) + } + + return nil + } + + tw := tablewriter.New( + tablewriter.Col("Bucket Expiration"), + tablewriter.Col("Sector Count"), + tablewriter.Col("Bar"), + ) + + var barCols = 40 + var maxCount int + + for _, c := range bucketCounts { + if c > maxCount { + maxCount = c + } + } + + for i := 0; i < buckets; i++ { + maxExp := minExpiration + abi.ChainEpoch(i+1)*bucketSize + timeStr := cliutil.EpochTime(head.Height(), maxExp) + + tw.Write(map[string]interface{}{ + "Bucket Expiration": timeStr, + "Sector Count": color.YellowString("%d", bucketCounts[i]), + "Bar": "[" + color.GreenString(strings.Repeat("|", bucketCounts[i]*barCols/maxCount)) + strings.Repeat(" ", barCols-bucketCounts[i]*barCols/maxCount) + "]", + }) + } + + return tw.Flush(os.Stdout) + }, +} + var sectorsRefsCmd = &cli.Command{ Name: "refs", Usage: "List References to sectors", diff --git a/cmd/lotus-shed/sectors.go b/cmd/lotus-shed/sectors.go index 465b0e772..899e0f290 100644 --- a/cmd/lotus-shed/sectors.go +++ b/cmd/lotus-shed/sectors.go @@ -653,7 +653,7 @@ fr32 padding is removed from the output.`, return xerrors.Errorf("getting reader: %w", err) } - rd, err := readStarter(0) + rd, err := readStarter(0, storiface.PaddedByteIndex(length)) if err != nil { return xerrors.Errorf("starting reader: %w", err) } diff --git a/documentation/en/api-v0-methods.md b/documentation/en/api-v0-methods.md index 431d90acd..2acc969d3 100644 --- a/documentation/en/api-v0-methods.md +++ b/documentation/en/api-v0-methods.md @@ -4202,7 +4202,7 @@ Inputs: Response: ```json { - "Channel": "\u003cempty\u003e", + "Channel": "f01234", "From": "f01234", "To": "f01234", "ConfirmedAmt": "0", @@ -4233,7 +4233,7 @@ Inputs: Response: ```json { - "Channel": "\u003cempty\u003e", + "Channel": "f01234", "From": "f01234", "To": "f01234", "ConfirmedAmt": "0", @@ -4953,7 +4953,7 @@ Response: }, "Nonce": 42, "Balance": "0", - "Address": "\u003cempty\u003e" + "Address": "f01234" } } ``` @@ -5242,7 +5242,7 @@ Response: }, "Nonce": 42, "Balance": "0", - "Address": "\u003cempty\u003e" + "Address": "f01234" } ``` @@ -5985,6 +5985,7 @@ Response: "SectorSize": 34359738368, "WindowPoStPartitionSectors": 42, "ConsensusFaultElapsed": 10101, + "PendingOwnerAddress": "f01234", "Beneficiary": "f01234", "BeneficiaryTerm": { "Quota": "0", diff --git a/documentation/en/api-v1-unstable-methods.md b/documentation/en/api-v1-unstable-methods.md index 645df4f9d..e3c97eecf 100644 --- a/documentation/en/api-v1-unstable-methods.md +++ b/documentation/en/api-v1-unstable-methods.md @@ -5576,7 +5576,7 @@ Inputs: Response: ```json { - "Channel": "\u003cempty\u003e", + "Channel": "f01234", "From": "f01234", "To": "f01234", "ConfirmedAmt": "0", @@ -5607,7 +5607,7 @@ Inputs: Response: ```json { - "Channel": "\u003cempty\u003e", + "Channel": "f01234", "From": "f01234", "To": "f01234", "ConfirmedAmt": "0", @@ -6390,7 +6390,7 @@ Response: }, "Nonce": 42, "Balance": "0", - "Address": "\u003cempty\u003e" + "Address": "f01234" } } ``` @@ -6730,7 +6730,7 @@ Response: }, "Nonce": 42, "Balance": "0", - "Address": "\u003cempty\u003e" + "Address": "f01234" } ``` @@ -7505,6 +7505,7 @@ Response: "SectorSize": 34359738368, "WindowPoStPartitionSectors": 42, "ConsensusFaultElapsed": 10101, + "PendingOwnerAddress": "f01234", "Beneficiary": "f01234", "BeneficiaryTerm": { "Quota": "0", diff --git a/documentation/en/cli-lotus-miner.md b/documentation/en/cli-lotus-miner.md index 1446a36a9..1e319eb33 100644 --- a/documentation/en/cli-lotus-miner.md +++ b/documentation/en/cli-lotus-miner.md @@ -654,17 +654,37 @@ NAME: lotus-miner sectors list - List sectors USAGE: - lotus-miner sectors list [command options] [arguments...] + lotus-miner sectors list command [command options] [arguments...] + +COMMANDS: + upgrade-bounds Output upgrade bounds for available sectors + help, h Shows a list of commands or help for one command OPTIONS: - --check-parallelism value number of parallel requests to make for checking sector states (default: 300) - --events, -e display number of events the sector has received (default: false) + --show-removed, -r show removed sectors (default: false) --fast, -f don't show on-chain info for better performance (default: false) + --events, -e display number of events the sector has received (default: false) --initial-pledge, -p display initial pledge (default: false) --seal-time, -t display how long it took for the sector to be sealed (default: false) - --show-removed, -r show removed sectors (default: false) --states value filter sectors by a comma-separated list of states --unproven, -u only show sectors which aren't in the 'Proving' state (default: false) + --check-parallelism value number of parallel requests to make for checking sector states (default: 300) + --help, -h show help (default: false) + +``` + +#### lotus-miner sectors list upgrade-bounds +``` +NAME: + lotus-miner sectors list upgrade-bounds - Output upgrade bounds for available sectors + +USAGE: + lotus-miner sectors list upgrade-bounds [command options] [arguments...] + +OPTIONS: + --buckets value (default: 25) + --csv output machine-readable values (default: false) + --deal-terms bucket by how many deal-sectors can start at a given expiration (default: false) ``` diff --git a/documentation/en/default-lotus-config.toml b/documentation/en/default-lotus-config.toml index cd9332356..960a55fd0 100644 --- a/documentation/en/default-lotus-config.toml +++ b/documentation/en/default-lotus-config.toml @@ -38,7 +38,7 @@ # # type: []string # env var: LOTUS_LIBP2P_LISTENADDRESSES - #ListenAddresses = ["/ip4/0.0.0.0/tcp/0", "/ip6/::/tcp/0"] + #ListenAddresses = ["/ip4/0.0.0.0/tcp/0", "/ip6/::/tcp/0", "/ip4/0.0.0.0/udp/0/quic-v1", "/ip6/::/udp/0/quic-v1", "/ip4/0.0.0.0/udp/0/quic-v1/webtransport", "/ip6/::/udp/0/quic-v1/webtransport"] # Addresses to explicitally announce to other peers. If not specified, # all interface addresses are announced diff --git a/documentation/en/default-lotus-miner-config.toml b/documentation/en/default-lotus-miner-config.toml index 0c8ef4411..c0a204bf1 100644 --- a/documentation/en/default-lotus-miner-config.toml +++ b/documentation/en/default-lotus-miner-config.toml @@ -38,7 +38,7 @@ # # type: []string # env var: LOTUS_LIBP2P_LISTENADDRESSES - #ListenAddresses = ["/ip4/0.0.0.0/tcp/0", "/ip6/::/tcp/0"] + #ListenAddresses = ["/ip4/0.0.0.0/tcp/0", "/ip6/::/tcp/0", "/ip4/0.0.0.0/udp/0/quic-v1", "/ip6/::/udp/0/quic-v1", "/ip4/0.0.0.0/udp/0/quic-v1/webtransport", "/ip6/::/udp/0/quic-v1/webtransport"] # Addresses to explicitally announce to other peers. If not specified, # all interface addresses are announced @@ -515,15 +515,7 @@ # env var: LOTUS_SEALING_MINUPGRADESECTOREXPIRATION #MinUpgradeSectorExpiration = 0 - # When set to a non-zero value, minimum number of epochs until sector expiration above which upgrade candidates will - # be selected based on lowest initial pledge. - # - # Target sector expiration is calculated by looking at the input deal queue, sorting it by deal expiration, and - # selecting N deals from the queue up to sector size. The target expiration will be Nth deal end epoch, or in case - # where there weren't enough deals to fill a sector, DealMaxDuration (540 days = 1555200 epochs) - # - # Setting this to a high value (for example to maximum deal duration - 1555200) will disable selection based on - # initial pledge - upgrade sectors will always be chosen based on longest expiration + # DEPRECATED: Target expiration is no longer used # # type: uint64 # env var: LOTUS_SEALING_MINTARGETUPGRADESECTOREXPIRATION diff --git a/gateway/node.go b/gateway/node.go index d0ff53402..6f1dac73a 100644 --- a/gateway/node.go +++ b/gateway/node.go @@ -43,6 +43,9 @@ const ( // TargetAPI defines the API methods that the Node depends on // (to make it easy to mock for tests) type TargetAPI interface { + MpoolPending(context.Context, types.TipSetKey) ([]*types.SignedMessage, error) + ChainGetBlock(context.Context, cid.Cid) (*types.BlockHeader, error) + MinerGetBaseInfo(context.Context, address.Address, abi.ChainEpoch, types.TipSetKey) (*api.MiningBaseInfo, error) GasEstimateGasPremium(context.Context, uint64, address.Address, int64, types.TipSetKey) (types.BigInt, error) StateReplay(context.Context, types.TipSetKey, cid.Cid) (*api.InvocResult, error) StateMinerSectorCount(context.Context, address.Address, types.TipSetKey) (api.MinerSectors, error) diff --git a/gateway/proxy_fil.go b/gateway/proxy_fil.go index c15a78f5f..abd5371fe 100644 --- a/gateway/proxy_fil.go +++ b/gateway/proxy_fil.go @@ -23,6 +23,33 @@ import ( "github.com/filecoin-project/lotus/node/modules/dtypes" ) +func (gw *Node) MpoolPending(ctx context.Context, tsk types.TipSetKey) ([]*types.SignedMessage, error) { + if err := gw.limit(ctx, stateRateLimitTokens); err != nil { + return nil, err + } + if err := gw.checkTipsetKey(ctx, tsk); err != nil { + return nil, err + } + return gw.target.MpoolPending(ctx, tsk) +} + +func (gw *Node) ChainGetBlock(ctx context.Context, c cid.Cid) (*types.BlockHeader, error) { + if err := gw.limit(ctx, chainRateLimitTokens); err != nil { + return nil, err + } + return gw.target.ChainGetBlock(ctx, c) +} + +func (gw *Node) MinerGetBaseInfo(ctx context.Context, addr address.Address, h abi.ChainEpoch, tsk types.TipSetKey) (*api.MiningBaseInfo, error) { + if err := gw.limit(ctx, stateRateLimitTokens); err != nil { + return nil, err + } + if err := gw.checkTipsetKey(ctx, tsk); err != nil { + return nil, err + } + return gw.target.MinerGetBaseInfo(ctx, addr, h, tsk) +} + func (gw *Node) StateReplay(ctx context.Context, tsk types.TipSetKey, c cid.Cid) (*api.InvocResult, error) { if err := gw.limit(ctx, chainRateLimitTokens); err != nil { return nil, err diff --git a/go.mod b/go.mod index 85e75dd49..f775b488e 100644 --- a/go.mod +++ b/go.mod @@ -41,7 +41,7 @@ require ( github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc7 github.com/filecoin-project/go-fil-commcid v0.1.0 github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 - github.com/filecoin-project/go-fil-markets v1.28.1 + github.com/filecoin-project/go-fil-markets v1.28.2 github.com/filecoin-project/go-jsonrpc v0.3.1 github.com/filecoin-project/go-padreader v0.0.1 github.com/filecoin-project/go-paramfetch v0.0.4 diff --git a/go.sum b/go.sum index 1b50490c9..210be557c 100644 --- a/go.sum +++ b/go.sum @@ -321,8 +321,8 @@ github.com/filecoin-project/go-fil-commcid v0.1.0 h1:3R4ds1A9r6cr8mvZBfMYxTS88Oq github.com/filecoin-project/go-fil-commcid v0.1.0/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ= github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 h1:imrrpZWEHRnNqqv0tN7LXep5bFEVOVmQWHJvl2mgsGo= github.com/filecoin-project/go-fil-commp-hashhash v0.1.0/go.mod h1:73S8WSEWh9vr0fDJVnKADhfIv/d6dCbAGaAGWbdJEI8= -github.com/filecoin-project/go-fil-markets v1.28.1 h1:2iq1o/ZY1RSbVU8OX7nwWO66SYtdjKEngshc4Jeua/Y= -github.com/filecoin-project/go-fil-markets v1.28.1/go.mod h1:qy9LNu9t77I184VB6Pa4WKRtGfB8Vl0t8zfOLHkDqWY= +github.com/filecoin-project/go-fil-markets v1.28.2 h1:Ev9o8BYow+lo97Bwc6oOmZ2OxdiHeIDCQsfF/w/Vldc= +github.com/filecoin-project/go-fil-markets v1.28.2/go.mod h1:qy9LNu9t77I184VB6Pa4WKRtGfB8Vl0t8zfOLHkDqWY= github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM= github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24= github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM= diff --git a/itests/deals_partial_retrieval_dm-level_test.go b/itests/deals_partial_retrieval_dm-level_test.go index 4e59b163b..391c467a9 100644 --- a/itests/deals_partial_retrieval_dm-level_test.go +++ b/itests/deals_partial_retrieval_dm-level_test.go @@ -48,7 +48,7 @@ func TestDMLevelPartialRetrieval(t *testing.T) { ctx := context.Background() kit.QuietMiningLogs() - client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC(), kit.MockProofs()) + client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC()) dh := kit.NewDealHarness(t, client, miner, miner) ens.InterconnectAll().BeginMining(50 * time.Millisecond) diff --git a/itests/kit/blockminer.go b/itests/kit/blockminer.go index 02876ebb3..bd527910d 100644 --- a/itests/kit/blockminer.go +++ b/itests/kit/blockminer.go @@ -184,7 +184,8 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur dlinfo, err := bm.miner.FullNode.StateMinerProvingDeadline(ctx, bm.miner.ActorAddr, ts.Key()) require.NoError(bm.t, err) - if ts.Height()+1+abi.ChainEpoch(nulls) >= dlinfo.Last() { // Next block brings us past the last epoch in dline, we need to wait for miner to post + if ts.Height()+5+abi.ChainEpoch(nulls) >= dlinfo.Last() { // Next block brings us past the last epoch in dline, we need to wait for miner to post + bm.t.Logf("forcing post to get in before deadline closes at %d", dlinfo.Last()) bm.forcePoSt(ctx, ts, dlinfo) } @@ -216,7 +217,8 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur } if !success { // if we are mining a new null block and it brings us past deadline boundary we need to wait for miner to post - if ts.Height()+1+abi.ChainEpoch(nulls+i) >= dlinfo.Last() { + if ts.Height()+5+abi.ChainEpoch(nulls+i) >= dlinfo.Last() { + bm.t.Logf("forcing post to get in before deadline closes at %d", dlinfo.Last()) bm.forcePoSt(ctx, ts, dlinfo) } } diff --git a/itests/net_test.go b/itests/net_test.go index 2ee8f60b9..332677c23 100644 --- a/itests/net_test.go +++ b/itests/net_test.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "testing" + "time" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -220,14 +221,17 @@ func TestNetBlockIPAddr(t *testing.T) { firstAddrInfo, _ := firstNode.NetAddrsListen(ctx) secondAddrInfo, _ := secondNode.NetAddrsListen(ctx) - var secondNodeIPs []string - + secondNodeIPsMap := map[string]struct{}{} // so we can deduplicate for _, addr := range secondAddrInfo.Addrs { ip, err := manet.ToIP(addr) if err != nil { continue } - secondNodeIPs = append(secondNodeIPs, ip.String()) + secondNodeIPsMap[ip.String()] = struct{}{} + } + var secondNodeIPs []string + for s := range secondNodeIPsMap { + secondNodeIPs = append(secondNodeIPs, s) } // Sanity check that we're not already connected somehow @@ -243,6 +247,8 @@ func TestNetBlockIPAddr(t *testing.T) { list, err := firstNode.NetBlockList(ctx) require.NoError(t, err) + fmt.Println(list.IPAddrs) + fmt.Println(secondNodeIPs) require.Equal(t, len(list.IPAddrs), len(secondNodeIPs), "expected %d blocked IPs", len(secondNodeIPs)) for _, blockedIP := range list.IPAddrs { found := false @@ -256,10 +262,14 @@ func TestNetBlockIPAddr(t *testing.T) { require.True(t, found, "blocked IP %s is not one of secondNodeIPs", blockedIP) } - require.Error(t, secondNode.NetConnect(ctx, firstAddrInfo), "shouldn't be able to connect to second node") - connectedness, err = secondNode.NetConnectedness(ctx, firstAddrInfo.ID) - require.NoError(t, err, "failed to determine connectedness") - require.NotEqual(t, connectedness, network.Connected) + // a QUIC connection might still succeed when gated, but will be killed right after the handshake + _ = secondNode.NetConnect(ctx, firstAddrInfo) + + require.Eventually(t, func() bool { + connectedness, err = secondNode.NetConnectedness(ctx, firstAddrInfo.ID) + require.NoError(t, err, "failed to determine connectedness") + return connectedness != network.Connected + }, time.Second*5, time.Millisecond*10) // stm: @NETWORK_COMMON_BLOCK_REMOVE_001 err = firstNode.NetBlockRemove(ctx, api.NetBlockList{IPAddrs: secondNodeIPs}) diff --git a/lib/readerutil/readerutil.go b/lib/readerutil/readerutil.go new file mode 100644 index 000000000..ea2fed426 --- /dev/null +++ b/lib/readerutil/readerutil.go @@ -0,0 +1,44 @@ +package readerutil + +import ( + "io" + "os" +) + +// NewReadSeekerFromReaderAt returns a new io.ReadSeeker from a io.ReaderAt. +// The returned io.ReadSeeker will read from the io.ReaderAt starting at the +// given base offset. +func NewReadSeekerFromReaderAt(readerAt io.ReaderAt, base int64) io.ReadSeeker { + return &readSeekerFromReaderAt{ + readerAt: readerAt, + base: base, + pos: 0, + } +} + +type readSeekerFromReaderAt struct { + readerAt io.ReaderAt + base int64 + pos int64 +} + +func (rs *readSeekerFromReaderAt) Read(p []byte) (n int, err error) { + n, err = rs.readerAt.ReadAt(p, rs.pos+rs.base) + rs.pos += int64(n) + return n, err +} + +func (rs *readSeekerFromReaderAt) Seek(offset int64, whence int) (int64, error) { + switch whence { + case io.SeekStart: + rs.pos = offset + case io.SeekCurrent: + rs.pos += offset + case io.SeekEnd: + return 0, io.ErrUnexpectedEOF + default: + return 0, os.ErrInvalid + } + + return rs.pos, nil +} diff --git a/metrics/metrics.go b/metrics/metrics.go index 13627a663..ee7bd8695 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -61,6 +61,10 @@ var ( // vm execution ExecutionLane, _ = tag.NewKey("lane") + + // piecereader + PRReadType, _ = tag.NewKey("pr_type") // seq / rand + PRReadSize, _ = tag.NewKey("pr_size") // small / big ) // Measures @@ -153,8 +157,9 @@ var ( SchedCycleOpenWindows = stats.Int64("sched/assigner_cycle_open_window", "Number of open windows in scheduling cycles", stats.UnitDimensionless) SchedCycleQueueSize = stats.Int64("sched/assigner_cycle_task_queue_entry", "Number of task queue entries in scheduling cycles", stats.UnitDimensionless) - DagStorePRInitCount = stats.Int64("dagstore/pr_init_count", "PieceReader init count", stats.UnitDimensionless) - DagStorePRBytesRequested = stats.Int64("dagstore/pr_requested_bytes", "PieceReader requested bytes", stats.UnitBytes) + DagStorePRInitCount = stats.Int64("dagstore/pr_init_count", "PieceReader init count", stats.UnitDimensionless) + DagStorePRBytesRequested = stats.Int64("dagstore/pr_requested_bytes", "PieceReader requested bytes", stats.UnitBytes) + DagStorePRBytesDiscarded = stats.Int64("dagstore/pr_discarded_bytes", "PieceReader discarded bytes", stats.UnitBytes) DagStorePRDiscardCount = stats.Int64("dagstore/pr_discard_count", "PieceReader discard count", stats.UnitDimensionless) DagStorePRSeekBackCount = stats.Int64("dagstore/pr_seek_back_count", "PieceReader seek back count", stats.UnitDimensionless) @@ -162,6 +167,12 @@ var ( DagStorePRSeekBackBytes = stats.Int64("dagstore/pr_seek_back_bytes", "PieceReader seek back bytes", stats.UnitBytes) DagStorePRSeekForwardBytes = stats.Int64("dagstore/pr_seek_forward_bytes", "PieceReader seek forward bytes", stats.UnitBytes) + DagStorePRAtHitBytes = stats.Int64("dagstore/pr_at_hit_bytes", "PieceReader ReadAt bytes from cache", stats.UnitBytes) + DagStorePRAtHitCount = stats.Int64("dagstore/pr_at_hit_count", "PieceReader ReadAt from cache hits", stats.UnitDimensionless) + DagStorePRAtCacheFillCount = stats.Int64("dagstore/pr_at_cache_fill_count", "PieceReader ReadAt full cache fill count", stats.UnitDimensionless) + DagStorePRAtReadBytes = stats.Int64("dagstore/pr_at_read_bytes", "PieceReader ReadAt bytes read from source", stats.UnitBytes) // PRReadSize tag + DagStorePRAtReadCount = stats.Int64("dagstore/pr_at_read_count", "PieceReader ReadAt reads from source", stats.UnitDimensionless) // PRReadSize tag + // splitstore SplitstoreMiss = stats.Int64("splitstore/miss", "Number of misses in hotstre access", stats.UnitDimensionless) SplitstoreCompactionTimeSeconds = stats.Float64("splitstore/compaction_time", "Compaction time in seconds", stats.UnitSeconds) @@ -487,6 +498,7 @@ var ( DagStorePRBytesRequestedView = &view.View{ Measure: DagStorePRBytesRequested, Aggregation: view.Sum(), + TagKeys: []tag.Key{PRReadType}, } DagStorePRBytesDiscardedView = &view.View{ Measure: DagStorePRBytesDiscarded, @@ -513,6 +525,29 @@ var ( Aggregation: view.Sum(), } + DagStorePRAtHitBytesView = &view.View{ + Measure: DagStorePRAtHitBytes, + Aggregation: view.Sum(), + } + DagStorePRAtHitCountView = &view.View{ + Measure: DagStorePRAtHitCount, + Aggregation: view.Count(), + } + DagStorePRAtCacheFillCountView = &view.View{ + Measure: DagStorePRAtCacheFillCount, + Aggregation: view.Count(), + } + DagStorePRAtReadBytesView = &view.View{ + Measure: DagStorePRAtReadBytes, + Aggregation: view.Sum(), + TagKeys: []tag.Key{PRReadSize}, + } + DagStorePRAtReadCountView = &view.View{ + Measure: DagStorePRAtReadCount, + Aggregation: view.Count(), + TagKeys: []tag.Key{PRReadSize}, + } + // splitstore SplitstoreMissView = &view.View{ Measure: SplitstoreMiss, @@ -779,6 +814,11 @@ var MinerNodeViews = append([]*view.View{ DagStorePRSeekForwardCountView, DagStorePRSeekBackBytesView, DagStorePRSeekForwardBytesView, + DagStorePRAtHitBytesView, + DagStorePRAtHitCountView, + DagStorePRAtCacheFillCountView, + DagStorePRAtReadBytesView, + DagStorePRAtReadCountView, }, DefaultViews...) var GatewayNodeViews = append([]*view.View{ diff --git a/node/config/def.go b/node/config/def.go index 703288288..42b035c66 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -57,6 +57,10 @@ func defCommon() Common { ListenAddresses: []string{ "/ip4/0.0.0.0/tcp/0", "/ip6/::/tcp/0", + "/ip4/0.0.0.0/udp/0/quic-v1", + "/ip6/::/udp/0/quic-v1", + "/ip4/0.0.0.0/udp/0/quic-v1/webtransport", + "/ip6/::/udp/0/quic-v1/webtransport", }, AnnounceAddresses: []string{}, NoAnnounceAddresses: []string{}, diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index 26a254bad..e4d41c1a9 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -1106,15 +1106,7 @@ required to have expiration of at least the soonest-ending deal`, Name: "MinTargetUpgradeSectorExpiration", Type: "uint64", - Comment: `When set to a non-zero value, minimum number of epochs until sector expiration above which upgrade candidates will -be selected based on lowest initial pledge. - -Target sector expiration is calculated by looking at the input deal queue, sorting it by deal expiration, and -selecting N deals from the queue up to sector size. The target expiration will be Nth deal end epoch, or in case -where there weren't enough deals to fill a sector, DealMaxDuration (540 days = 1555200 epochs) - -Setting this to a high value (for example to maximum deal duration - 1555200) will disable selection based on -initial pledge - upgrade sectors will always be chosen based on longest expiration`, + Comment: `DEPRECATED: Target expiration is no longer used`, }, { Name: "CommittedCapacitySectorLifetime", diff --git a/node/config/types.go b/node/config/types.go index 5cbd21bf3..c7fc53645 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -352,15 +352,7 @@ type SealingConfig struct { // required to have expiration of at least the soonest-ending deal MinUpgradeSectorExpiration uint64 - // When set to a non-zero value, minimum number of epochs until sector expiration above which upgrade candidates will - // be selected based on lowest initial pledge. - // - // Target sector expiration is calculated by looking at the input deal queue, sorting it by deal expiration, and - // selecting N deals from the queue up to sector size. The target expiration will be Nth deal end epoch, or in case - // where there weren't enough deals to fill a sector, DealMaxDuration (540 days = 1555200 epochs) - // - // Setting this to a high value (for example to maximum deal duration - 1555200) will disable selection based on - // initial pledge - upgrade sectors will always be chosen based on longest expiration + // DEPRECATED: Target expiration is no longer used MinTargetUpgradeSectorExpiration uint64 // CommittedCapacitySectorLifetime is the duration a Committed Capacity (CC) sector will diff --git a/node/impl/full/state.go b/node/impl/full/state.go index 45b14d3f7..78f450626 100644 --- a/node/impl/full/state.go +++ b/node/impl/full/state.go @@ -180,6 +180,7 @@ func (m *StateModule) StateMinerInfo(ctx context.Context, actor address.Address, SectorSize: info.SectorSize, WindowPoStPartitionSectors: info.WindowPoStPartitionSectors, ConsensusFaultElapsed: info.ConsensusFaultElapsed, + PendingOwnerAddress: info.PendingOwnerAddress, Beneficiary: info.Beneficiary, BeneficiaryTerm: &info.BeneficiaryTerm, PendingBeneficiaryTerm: info.PendingBeneficiaryTerm, diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index a4147d83d..74251e21d 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -983,19 +983,18 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error return func(cfg sealiface.Config) (err error) { err = mutateSealingCfg(r, func(c config.SealingConfiger) { newCfg := config.SealingConfig{ - MaxWaitDealsSectors: cfg.MaxWaitDealsSectors, - MaxSealingSectors: cfg.MaxSealingSectors, - MaxSealingSectorsForDeals: cfg.MaxSealingSectorsForDeals, - PreferNewSectorsForDeals: cfg.PreferNewSectorsForDeals, - MaxUpgradingSectors: cfg.MaxUpgradingSectors, - CommittedCapacitySectorLifetime: config.Duration(cfg.CommittedCapacitySectorLifetime), - WaitDealsDelay: config.Duration(cfg.WaitDealsDelay), - MakeNewSectorForDeals: cfg.MakeNewSectorForDeals, - MinUpgradeSectorExpiration: cfg.MinUpgradeSectorExpiration, - MinTargetUpgradeSectorExpiration: cfg.MinTargetUpgradeSectorExpiration, - MakeCCSectorsAvailable: cfg.MakeCCSectorsAvailable, - AlwaysKeepUnsealedCopy: cfg.AlwaysKeepUnsealedCopy, - FinalizeEarly: cfg.FinalizeEarly, + MaxWaitDealsSectors: cfg.MaxWaitDealsSectors, + MaxSealingSectors: cfg.MaxSealingSectors, + MaxSealingSectorsForDeals: cfg.MaxSealingSectorsForDeals, + PreferNewSectorsForDeals: cfg.PreferNewSectorsForDeals, + MaxUpgradingSectors: cfg.MaxUpgradingSectors, + CommittedCapacitySectorLifetime: config.Duration(cfg.CommittedCapacitySectorLifetime), + WaitDealsDelay: config.Duration(cfg.WaitDealsDelay), + MakeNewSectorForDeals: cfg.MakeNewSectorForDeals, + MinUpgradeSectorExpiration: cfg.MinUpgradeSectorExpiration, + MakeCCSectorsAvailable: cfg.MakeCCSectorsAvailable, + AlwaysKeepUnsealedCopy: cfg.AlwaysKeepUnsealedCopy, + FinalizeEarly: cfg.FinalizeEarly, CollateralFromMinerBalance: cfg.CollateralFromMinerBalance, AvailableBalanceBuffer: types.FIL(cfg.AvailableBalanceBuffer), @@ -1027,13 +1026,12 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error func ToSealingConfig(dealmakingCfg config.DealmakingConfig, sealingCfg config.SealingConfig) sealiface.Config { return sealiface.Config{ - MaxWaitDealsSectors: sealingCfg.MaxWaitDealsSectors, - MaxSealingSectors: sealingCfg.MaxSealingSectors, - MaxSealingSectorsForDeals: sealingCfg.MaxSealingSectorsForDeals, - PreferNewSectorsForDeals: sealingCfg.PreferNewSectorsForDeals, - MinUpgradeSectorExpiration: sealingCfg.MinUpgradeSectorExpiration, - MinTargetUpgradeSectorExpiration: sealingCfg.MinTargetUpgradeSectorExpiration, - MaxUpgradingSectors: sealingCfg.MaxUpgradingSectors, + MaxWaitDealsSectors: sealingCfg.MaxWaitDealsSectors, + MaxSealingSectors: sealingCfg.MaxSealingSectors, + MaxSealingSectorsForDeals: sealingCfg.MaxSealingSectorsForDeals, + PreferNewSectorsForDeals: sealingCfg.PreferNewSectorsForDeals, + MinUpgradeSectorExpiration: sealingCfg.MinUpgradeSectorExpiration, + MaxUpgradingSectors: sealingCfg.MaxUpgradingSectors, StartEpochSealingBuffer: abi.ChainEpoch(dealmakingCfg.StartEpochSealingBuffer), MakeNewSectorForDeals: sealingCfg.MakeNewSectorForDeals, diff --git a/storage/paths/http_handler.go b/storage/paths/http_handler.go index fbf1c85b0..4d0539079 100644 --- a/storage/paths/http_handler.go +++ b/storage/paths/http_handler.go @@ -3,6 +3,7 @@ package paths import ( "bytes" "encoding/json" + "io" "net/http" "os" "strconv" @@ -35,7 +36,7 @@ func (d *DefaultPartialFileHandler) HasAllocated(pf *partialfile.PartialFile, of return pf.HasAllocated(offset, size) } -func (d *DefaultPartialFileHandler) Reader(pf *partialfile.PartialFile, offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (*os.File, error) { +func (d *DefaultPartialFileHandler) Reader(pf *partialfile.PartialFile, offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (io.Reader, error) { return pf.Reader(offset, size) } diff --git a/storage/paths/http_handler_test.go b/storage/paths/http_handler_test.go index 4987936dd..cf6d71c37 100644 --- a/storage/paths/http_handler_test.go +++ b/storage/paths/http_handler_test.go @@ -207,8 +207,8 @@ func TestRemoteGetAllocated(t *testing.T) { pfhandler := mocks.NewMockPartialFileHandler(mockCtrl) handler := &paths.FetchHandler{ - lstore, - pfhandler, + Local: lstore, + PfHandler: pfhandler, } // run http server diff --git a/storage/paths/interface.go b/storage/paths/interface.go index b0e714c13..d96135de8 100644 --- a/storage/paths/interface.go +++ b/storage/paths/interface.go @@ -2,7 +2,7 @@ package paths import ( "context" - "os" + "io" "github.com/filecoin-project/go-state-types/abi" @@ -24,7 +24,7 @@ type PartialFileHandler interface { HasAllocated(pf *partialfile.PartialFile, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) // Reader returns a file from which we can read the unsealed piece in the partial file. - Reader(pf *partialfile.PartialFile, offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (*os.File, error) + Reader(pf *partialfile.PartialFile, offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (io.Reader, error) // Close closes the partial file Close(pf *partialfile.PartialFile) error diff --git a/storage/paths/mocks/pf.go b/storage/paths/mocks/pf.go index 50b020aaa..43b3bc489 100644 --- a/storage/paths/mocks/pf.go +++ b/storage/paths/mocks/pf.go @@ -5,7 +5,7 @@ package mocks import ( - os "os" + io "io" reflect "reflect" gomock "github.com/golang/mock/gomock" @@ -84,10 +84,10 @@ func (mr *MockPartialFileHandlerMockRecorder) OpenPartialFile(arg0, arg1 interfa } // Reader mocks base method. -func (m *MockPartialFileHandler) Reader(arg0 *partialfile.PartialFile, arg1 storiface.PaddedByteIndex, arg2 abi.PaddedPieceSize) (*os.File, error) { +func (m *MockPartialFileHandler) Reader(arg0 *partialfile.PartialFile, arg1 storiface.PaddedByteIndex, arg2 abi.PaddedPieceSize) (io.Reader, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Reader", arg0, arg1, arg2) - ret0, _ := ret[0].(*os.File) + ret0, _ := ret[0].(io.Reader) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/storage/paths/remote.go b/storage/paths/remote.go index 06bd39bf1..ab23e9789 100644 --- a/storage/paths/remote.go +++ b/storage/paths/remote.go @@ -14,6 +14,7 @@ import ( "sort" "strings" "sync" + "time" "github.com/hashicorp/go-multierror" "golang.org/x/xerrors" @@ -21,6 +22,7 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/storage/sealer/fsutil" + "github.com/filecoin-project/lotus/storage/sealer/partialfile" "github.com/filecoin-project/lotus/storage/sealer/storiface" ) @@ -28,6 +30,10 @@ var FetchTempSubdir = "fetching" var CopyBuf = 1 << 20 +// LocalReaderTimeout is the timeout for keeping local reader files open without +// any read activity. +var LocalReaderTimeout = 5 * time.Second + type Remote struct { local Store index SectorIndex @@ -563,7 +569,7 @@ func (r *Remote) CheckIsUnsealed(ctx context.Context, s storiface.SectorRef, off // 1. no worker(local worker included) has an unsealed file for the given sector OR // 2. no worker(local worker included) has the unsealed piece in their unsealed sector file. // Will return a nil reader and a nil error in such a case. -func (r *Remote) Reader(ctx context.Context, s storiface.SectorRef, offset, size abi.PaddedPieceSize) (func(startOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error), error) { +func (r *Remote) Reader(ctx context.Context, s storiface.SectorRef, offset, size abi.PaddedPieceSize) (func(startOffsetAligned, endOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error), error) { ft := storiface.FTUnsealed // check if we have the unsealed sector file locally @@ -602,20 +608,67 @@ func (r *Remote) Reader(ctx context.Context, s storiface.SectorRef, offset, size if has { log.Infof("returning piece reader for local unsealed piece sector=%+v, (offset=%d, size=%d)", s.ID, offset, size) - return func(startOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error) { - // don't reuse between readers unless closed - f := pf - pf = nil + // refs keep track of the currently opened pf + // if they drop to 0 for longer than LocalReaderTimeout, pf will be closed + var refsLk sync.Mutex + refs := 0 - if f == nil { - f, err = r.pfHandler.OpenPartialFile(abi.PaddedPieceSize(ssize), path) - if err != nil { - return nil, xerrors.Errorf("opening partial file: %w", err) + cleanupIdle := func() { + lastRefs := 1 + + for range time.After(LocalReaderTimeout) { + refsLk.Lock() + if refs == 0 && lastRefs == 0 && pf != nil { // pf can't really be nil here, but better be safe + log.Infow("closing idle partial file", "path", path) + err := pf.Close() + if err != nil { + log.Errorw("closing idle partial file", "path", path, "error", err) + } + + pf = nil + refsLk.Unlock() + return } - log.Debugf("local partial file (re)opened %s (+%d,%d)", path, offset, size) + lastRefs = refs + refsLk.Unlock() + } + } + + getPF := func() (*partialfile.PartialFile, func() error, error) { + refsLk.Lock() + defer refsLk.Unlock() + + if pf == nil { + // got closed in the meantime, reopen + + var err error + pf, err = r.pfHandler.OpenPartialFile(abi.PaddedPieceSize(ssize), path) + if err != nil { + return nil, nil, xerrors.Errorf("reopening partial file: %w", err) + } + log.Debugf("local partial file reopened %s (+%d,%d)", path, offset, size) + + go cleanupIdle() } - r, err := r.pfHandler.Reader(f, storiface.PaddedByteIndex(offset)+startOffsetAligned, size-abi.PaddedPieceSize(startOffsetAligned)) + refs++ + + return pf, func() error { + refsLk.Lock() + defer refsLk.Unlock() + + refs-- + return nil + }, nil + } + + return func(startOffsetAligned, endOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error) { + pf, done, err := getPF() + if err != nil { + return nil, xerrors.Errorf("getting partialfile handle: %w", err) + } + + r, err := r.pfHandler.Reader(pf, storiface.PaddedByteIndex(offset)+startOffsetAligned, abi.PaddedPieceSize(endOffsetAligned-startOffsetAligned)) if err != nil { return nil, err } @@ -625,25 +678,7 @@ func (r *Remote) Reader(ctx context.Context, s storiface.SectorRef, offset, size io.Closer }{ Reader: r, - Closer: funcCloser(func() error { - // if we already have a reader cached, close this one - if pf != nil { - if f == nil { - return nil - } - if pf == f { - pf = nil - } - - tmp := f - f = nil - return tmp.Close() - } - - // otherwise stash it away for reuse - pf = f - return nil - }), + Closer: funcCloser(done), }, nil }, nil @@ -689,10 +724,10 @@ func (r *Remote) Reader(ctx context.Context, s storiface.SectorRef, offset, size continue } - return func(startOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error) { + return func(startOffsetAligned, endOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error) { // readRemote fetches a reader that we can use to read the unsealed piece from the remote worker. // It uses a ranged HTTP query to ensure we ONLY read the unsealed piece and not the entire unsealed file. - rd, err := r.readRemote(ctx, url, offset+abi.PaddedPieceSize(startOffsetAligned), size) + rd, err := r.readRemote(ctx, url, offset+abi.PaddedPieceSize(startOffsetAligned), offset+abi.PaddedPieceSize(endOffsetAligned)) if err != nil { log.Warnw("reading from remote", "url", url, "error", err) return nil, err diff --git a/storage/paths/remote_test.go b/storage/paths/remote_test.go index 41d5e8a17..e3376e6fa 100644 --- a/storage/paths/remote_test.go +++ b/storage/paths/remote_test.go @@ -477,7 +477,7 @@ func TestReader(t *testing.T) { require.Nil(t, rdg) require.Contains(t, err.Error(), tc.errStr) } else { - rd, err = rdg(0) + rd, err = rdg(0, storiface.PaddedByteIndex(size)) require.Error(t, err) require.Nil(t, rd) require.Contains(t, err.Error(), tc.errStr) @@ -490,7 +490,7 @@ func TestReader(t *testing.T) { require.Nil(t, rd) } else { require.NotNil(t, rdg) - rd, err := rdg(0) + rd, err := rdg(0, storiface.PaddedByteIndex(size)) require.NoError(t, err) defer func() { diff --git a/storage/pipeline/input.go b/storage/pipeline/input.go index f06359c87..7eb02ae27 100644 --- a/storage/pipeline/input.go +++ b/storage/pipeline/input.go @@ -417,6 +417,13 @@ func (m *Sealing) addPendingPiece(ctx context.Context, size abi.UnpaddedPieceSiz close(pp.doneCh) } + log.Debugw("new pending piece", "dealId", deal.DealID, + "piece", deal.DealProposal.PieceCID, + "size", size, + "dealStart", deal.DealSchedule.StartEpoch, + "dealEnd", deal.DealSchedule.EndEpoch, + "termEnd", ct.claimTermEnd) + m.pendingPieces[proposalCID(deal)] = pp go func() { defer m.inputLk.Unlock() @@ -694,7 +701,6 @@ func (m *Sealing) maybeUpgradeSector(ctx context.Context, sp abi.RegisteredSealP return &pieceBounds[f-1] } - targetExpirationEpoch := ts.Height() + abi.ChainEpoch(cfg.MinTargetUpgradeSectorExpiration) minExpirationEpoch := ts.Height() + abi.ChainEpoch(cfg.MinUpgradeSectorExpiration) var candidate abi.SectorID @@ -732,33 +738,25 @@ func (m *Sealing) maybeUpgradeSector(ctx context.Context, sp abi.RegisteredSealP continue } - // if the sector has less than one sector worth of candidate deals, and - // the best candidate has more candidate deals, this sector isn't better - if pb.dealBytesInBound.Padded() < abi.PaddedPieceSize(ssize) { - if bestDealBytes > pb.dealBytesInBound.Padded() { - continue - } - } - - // if best is below target, we want larger expirations - // if best is above target, we want lower pledge, but only if still above target - - // todo: after nv17 "target expiration" doesn't really make that much sense - // (tho to be fair it doesn't make too much sense now either) - // we probably want the lowest expiration that's still above the configured - // minimum, and can fit most candidate deals - - if bestExpiration < targetExpirationEpoch { - if expirationEpoch > bestExpiration && slowChecks(s.Number) { - bestExpiration = expirationEpoch - bestPledge = pledge - bestDealBytes = pb.dealBytesInBound.Padded() - candidate = s - } + if pb.dealBytesInBound.Padded() == 0 { + log.Debugw("skipping available sector", "sector", s.Number, "reason", "no deals in expiration bounds", "expiration", expirationEpoch) continue } - if expirationEpoch >= targetExpirationEpoch && pledge.LessThan(bestPledge) && slowChecks(s.Number) { + // if the sector has less than one sector worth of candidate deals, and + // the best candidate has more candidate deals, this sector isn't better + + lessThanSectorOfData := pb.dealBytesInBound.Padded() < abi.PaddedPieceSize(ssize) + moreDealsThanBest := pb.dealBytesInBound.Padded() > bestDealBytes + + // we want lower pledge, but only if we have more than one sector worth of deals + + preferDueToDealSize := lessThanSectorOfData && moreDealsThanBest + preferDueToPledge := pledge.LessThan(bestPledge) && !lessThanSectorOfData + + prefer := preferDueToDealSize || preferDueToPledge + + if prefer && slowChecks(s.Number) { bestExpiration = expirationEpoch bestPledge = pledge bestDealBytes = pb.dealBytesInBound.Padded() @@ -767,12 +765,12 @@ func (m *Sealing) maybeUpgradeSector(ctx context.Context, sp abi.RegisteredSealP } if bestExpiration < minExpirationEpoch { - log.Infow("Not upgrading any sectors", "available", len(m.available), "pieces", len(m.pendingPieces), "bestExp", bestExpiration, "target", targetExpirationEpoch, "min", minExpirationEpoch, "candidate", candidate) + log.Infow("Not upgrading any sectors", "available", len(m.available), "pieces", len(m.pendingPieces), "bestExp", bestExpiration, "min", minExpirationEpoch, "candidate", candidate) // didn't find a good sector / no sectors were available return false, nil } - log.Infow("Upgrading sector", "number", candidate.Number, "type", "deal", "proofType", sp, "expiration", bestExpiration, "pledge", types.FIL(bestPledge)) + log.Infow("Upgrading sector", "number", candidate.Number, "type", "deal", "proofType", sp, "expiration", bestExpiration, "pledge", types.FIL(bestPledge), "pieces", len(m.pendingPieces), "dealBytesAtExp", bestDealBytes) delete(m.available, candidate) m.nextDealSector = &candidate.Number return true, m.sectors.Send(uint64(candidate.Number), SectorStartCCUpdate{}) diff --git a/storage/pipeline/sealiface/config.go b/storage/pipeline/sealiface/config.go index 67fba27a6..dbdb91d54 100644 --- a/storage/pipeline/sealiface/config.go +++ b/storage/pipeline/sealiface/config.go @@ -22,8 +22,6 @@ type Config struct { MinUpgradeSectorExpiration uint64 - MinTargetUpgradeSectorExpiration uint64 - MaxUpgradingSectors uint64 MakeNewSectorForDeals bool diff --git a/storage/sealer/fr32/fr32.go b/storage/sealer/fr32/fr32.go index 24175719c..6f5be65b7 100644 --- a/storage/sealer/fr32/fr32.go +++ b/storage/sealer/fr32/fr32.go @@ -8,6 +8,21 @@ import ( "github.com/filecoin-project/go-state-types/abi" ) +// UnpaddedFr32Chunk is the minimum amount of data which can be fr32-padded +// Fr32 padding inserts two zero bits every 254 bits, so the minimum amount of +// data which can be padded is 254 bits. 127 bytes is the smallest multiple of +// 254 bits which has a whole number of bytes. +const UnpaddedFr32Chunk abi.UnpaddedPieceSize = 127 + +// PaddedFr32Chunk is the size of a UnpaddedFr32Chunk chunk after fr32 padding +const PaddedFr32Chunk abi.PaddedPieceSize = 128 + +func init() { + if PaddedFr32Chunk != UnpaddedFr32Chunk.Padded() { + panic("bad math") + } +} + var MTTresh = uint64(512 << 10) func mtChunkCount(usz abi.PaddedPieceSize) uint64 { diff --git a/storage/sealer/partialfile/partialfile.go b/storage/sealer/partialfile/partialfile.go index 6e8c2d843..4357f796d 100644 --- a/storage/sealer/partialfile/partialfile.go +++ b/storage/sealer/partialfile/partialfile.go @@ -13,6 +13,7 @@ import ( rlepluslazy "github.com/filecoin-project/go-bitfield/rle" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/lib/readerutil" "github.com/filecoin-project/lotus/storage/sealer/fsutil" "github.com/filecoin-project/lotus/storage/sealer/storiface" ) @@ -249,7 +250,10 @@ func (pf *PartialFile) Free(offset storiface.PaddedByteIndex, size abi.PaddedPie return nil } -func (pf *PartialFile) Reader(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (*os.File, error) { +// Reader forks off a new reader from the underlying file, and returns a reader +// starting at the given offset and reading the given size. Safe for concurrent +// use. +func (pf *PartialFile) Reader(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (io.Reader, error) { if _, err := pf.file.Seek(int64(offset), io.SeekStart); err != nil { return nil, xerrors.Errorf("seek piece start: %w", err) } @@ -275,7 +279,7 @@ func (pf *PartialFile) Reader(offset storiface.PaddedByteIndex, size abi.PaddedP } } - return pf.file, nil + return io.LimitReader(readerutil.NewReadSeekerFromReaderAt(pf.file, int64(offset)), int64(size)), nil } func (pf *PartialFile) Allocated() (rlepluslazy.RunIterator, error) { diff --git a/storage/sealer/piece_provider.go b/storage/sealer/piece_provider.go index f49b8b0c7..0e992b679 100644 --- a/storage/sealer/piece_provider.go +++ b/storage/sealer/piece_provider.go @@ -4,8 +4,10 @@ import ( "bufio" "context" "io" + "sync" "github.com/ipfs/go-cid" + pool "github.com/libp2p/go-buffer-pool" "golang.org/x/xerrors" "github.com/filecoin-project/dagstore/mount" @@ -71,7 +73,7 @@ func (p *pieceProvider) IsUnsealed(ctx context.Context, sector storiface.SectorR // It will NOT try to schedule an Unseal of a sealed sector file for the read. // // Returns a nil reader if the piece does NOT exist in any unsealed file or there is no unsealed file for the given sector on any of the workers. -func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, sector storiface.SectorRef, pieceOffset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (mount.Reader, error) { +func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, sector storiface.SectorRef, pieceOffset storiface.UnpaddedByteIndex, pieceSize abi.UnpaddedPieceSize) (mount.Reader, error) { // acquire a lock purely for reading unsealed sectors ctx, cancel := context.WithCancel(ctx) if err := p.index.StorageLock(ctx, sector.ID, storiface.FTUnsealed, storiface.FTNone); err != nil { @@ -82,30 +84,37 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, se // Reader returns a reader getter for an unsealed piece at the given offset in the given sector. // The returned reader will be nil if none of the workers has an unsealed sector file containing // the unsealed piece. - rg, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(pieceOffset.Padded()), size.Padded()) + readerGetter, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(pieceOffset.Padded()), pieceSize.Padded()) if err != nil { cancel() log.Debugf("did not get storage reader;sector=%+v, err:%s", sector.ID, err) return nil, err } - if rg == nil { + if readerGetter == nil { cancel() return nil, nil } - buf := make([]byte, fr32.BufSize(size.Padded())) - pr, err := (&pieceReader{ - ctx: ctx, - getReader: func(ctx context.Context, startOffset uint64) (io.ReadCloser, error) { - startOffsetAligned := storiface.UnpaddedByteIndex(startOffset / 127 * 127) // floor to multiple of 127 + getReader: func(startOffset, readSize uint64) (io.ReadCloser, error) { + // The request is for unpadded bytes, at any offset. + // storage.Reader readers give us fr32-padded bytes, so we need to + // do the unpadding here. - r, err := rg(startOffsetAligned.Padded()) + startOffsetAligned := storiface.UnpaddedFloor(startOffset) + startOffsetDiff := int(startOffset - uint64(startOffsetAligned)) + + endOffset := startOffset + readSize + endOffsetAligned := storiface.UnpaddedCeil(endOffset) + + r, err := readerGetter(startOffsetAligned.Padded(), endOffsetAligned.Padded()) if err != nil { return nil, xerrors.Errorf("getting reader at +%d: %w", startOffsetAligned, err) } - upr, err := fr32.NewUnpadReaderBuf(r, size.Padded(), buf) + buf := pool.Get(fr32.BufSize(pieceSize.Padded())) + + upr, err := fr32.NewUnpadReaderBuf(r, pieceSize.Padded(), buf) if err != nil { r.Close() // nolint return nil, xerrors.Errorf("creating unpadded reader: %w", err) @@ -113,26 +122,31 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, se bir := bufio.NewReaderSize(upr, 127) if startOffset > uint64(startOffsetAligned) { - if _, err := bir.Discard(int(startOffset - uint64(startOffsetAligned))); err != nil { + if _, err := bir.Discard(startOffsetDiff); err != nil { r.Close() // nolint return nil, xerrors.Errorf("discarding bytes for startOffset: %w", err) } } + var closeOnce sync.Once + return struct { io.Reader io.Closer }{ Reader: bir, Closer: funcCloser(func() error { + closeOnce.Do(func() { + pool.Put(buf) + }) return r.Close() }), }, nil }, - len: size, + len: pieceSize, onClose: cancel, pieceCid: pc, - }).init() + }).init(ctx) if err != nil || pr == nil { // pr == nil to make sure we don't return typed nil cancel() return nil, err diff --git a/storage/sealer/piece_reader.go b/storage/sealer/piece_reader.go index 6ef616e41..7a7cd1841 100644 --- a/storage/sealer/piece_reader.go +++ b/storage/sealer/piece_reader.go @@ -6,8 +6,10 @@ import ( "io" "sync" + lru "github.com/hashicorp/golang-lru/v2" "github.com/ipfs/go-cid" "go.opencensus.io/stats" + "go.opencensus.io/tag" "golang.org/x/xerrors" "github.com/filecoin-project/dagstore/mount" @@ -21,29 +23,48 @@ import ( var MaxPieceReaderBurnBytes int64 = 1 << 20 // 1M var ReadBuf = 128 * (127 * 8) // unpadded(128k) -type pieceGetter func(ctx context.Context, offset uint64) (io.ReadCloser, error) +var MinRandomReadSize = int64(4 << 10) + +type pieceGetter func(offset, size uint64) (io.ReadCloser, error) type pieceReader struct { - ctx context.Context getReader pieceGetter pieceCid cid.Cid len abi.UnpaddedPieceSize onClose context.CancelFunc + seqMCtx context.Context + atMCtx context.Context + closed bool seqAt int64 // next byte to be read by io.Reader - mu sync.Mutex - r io.ReadCloser - br *bufio.Reader - rAt int64 + // sequential reader + seqMu sync.Mutex + r io.ReadCloser + br *bufio.Reader + rAt int64 + + // random read cache + remReads *lru.Cache[int64, []byte] // data start offset -> data + // todo try carrying a "bytes read sequentially so far" counter with those + // cacahed byte buffers, increase buffer sizes when we see that we're doing + // a long sequential read } -func (p *pieceReader) init() (_ *pieceReader, err error) { - stats.Record(p.ctx, metrics.DagStorePRInitCount.M(1)) +func (p *pieceReader) init(ctx context.Context) (_ *pieceReader, err error) { + stats.Record(ctx, metrics.DagStorePRInitCount.M(1)) + + p.seqMCtx, _ = tag.New(ctx, tag.Upsert(metrics.PRReadType, "seq")) + p.atMCtx, _ = tag.New(ctx, tag.Upsert(metrics.PRReadType, "rand")) + + p.remReads, err = lru.New[int64, []byte](100) + if err != nil { + return nil, err + } p.rAt = 0 - p.r, err = p.getReader(p.ctx, uint64(p.rAt)) + p.r, err = p.getReader(uint64(p.rAt), uint64(p.len)) if err != nil { return nil, err } @@ -65,17 +86,14 @@ func (p *pieceReader) check() error { } func (p *pieceReader) Close() error { - p.mu.Lock() - defer p.mu.Unlock() + p.seqMu.Lock() + defer p.seqMu.Unlock() if err := p.check(); err != nil { return err } if p.r != nil { - if err := p.r.Close(); err != nil { - return err - } if err := p.r.Close(); err != nil { return err } @@ -90,21 +108,21 @@ func (p *pieceReader) Close() error { } func (p *pieceReader) Read(b []byte) (int, error) { - p.mu.Lock() - defer p.mu.Unlock() + p.seqMu.Lock() + defer p.seqMu.Unlock() if err := p.check(); err != nil { return 0, err } - n, err := p.readAtUnlocked(b, p.seqAt) + n, err := p.readSeqReader(b) p.seqAt += int64(n) return n, err } func (p *pieceReader) Seek(offset int64, whence int) (int64, error) { - p.mu.Lock() - defer p.mu.Unlock() + p.seqMu.Lock() + defer p.seqMu.Unlock() if err := p.check(); err != nil { return 0, err @@ -124,19 +142,14 @@ func (p *pieceReader) Seek(offset int64, whence int) (int64, error) { return p.seqAt, nil } -func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) { - p.mu.Lock() - defer p.mu.Unlock() +func (p *pieceReader) readSeqReader(b []byte) (n int, err error) { + off := p.seqAt - return p.readAtUnlocked(b, off) -} - -func (p *pieceReader) readAtUnlocked(b []byte, off int64) (n int, err error) { if err := p.check(); err != nil { return 0, err } - stats.Record(p.ctx, metrics.DagStorePRBytesRequested.M(int64(len(b)))) + stats.Record(p.seqMCtx, metrics.DagStorePRBytesRequested.M(int64(len(b)))) // 1. Get the backing reader into the correct position @@ -154,13 +167,13 @@ func (p *pieceReader) readAtUnlocked(b []byte, off int64) (n int, err error) { log.Debugw("pieceReader new stream", "piece", p.pieceCid, "at", p.rAt, "off", off-p.rAt, "n", len(b)) if off > p.rAt { - stats.Record(p.ctx, metrics.DagStorePRSeekForwardBytes.M(off-p.rAt), metrics.DagStorePRSeekForwardCount.M(1)) + stats.Record(p.seqMCtx, metrics.DagStorePRSeekForwardBytes.M(off-p.rAt), metrics.DagStorePRSeekForwardCount.M(1)) } else { - stats.Record(p.ctx, metrics.DagStorePRSeekBackBytes.M(p.rAt-off), metrics.DagStorePRSeekBackCount.M(1)) + stats.Record(p.seqMCtx, metrics.DagStorePRSeekBackBytes.M(p.rAt-off), metrics.DagStorePRSeekBackCount.M(1)) } p.rAt = off - p.r, err = p.getReader(p.ctx, uint64(p.rAt)) + p.r, err = p.getReader(uint64(p.rAt), uint64(p.len)) p.br = bufio.NewReaderSize(p.r, ReadBuf) if err != nil { return 0, xerrors.Errorf("getting backing reader: %w", err) @@ -169,7 +182,7 @@ func (p *pieceReader) readAtUnlocked(b []byte, off int64) (n int, err error) { // 2. Check if we need to burn some bytes if off > p.rAt { - stats.Record(p.ctx, metrics.DagStorePRBytesDiscarded.M(off-p.rAt), metrics.DagStorePRDiscardCount.M(1)) + stats.Record(p.seqMCtx, metrics.DagStorePRBytesDiscarded.M(off-p.rAt), metrics.DagStorePRDiscardCount.M(1)) n, err := io.CopyN(io.Discard, p.br, off-p.rAt) p.rAt += n @@ -196,4 +209,99 @@ func (p *pieceReader) readAtUnlocked(b []byte, off int64) (n int, err error) { return n, err } +func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) { + stats.Record(p.atMCtx, metrics.DagStorePRBytesRequested.M(int64(len(b)))) + + var filled int64 + + // try to get a buf from lru + data, ok := p.remReads.Get(off) + if ok { + n = copy(b, data) + filled += int64(n) + + if n < len(data) { + p.remReads.Add(off+int64(n), data[n:]) + + // keep the header buffered + if off != 0 { + p.remReads.Remove(off) + } + } + + stats.Record(p.atMCtx, metrics.DagStorePRAtHitBytes.M(int64(n)), metrics.DagStorePRAtHitCount.M(1)) + // dagstore/pr_at_hit_bytes, dagstore/pr_at_hit_count + } + if filled == int64(len(b)) { + // dagstore/pr_at_cache_fill_count + stats.Record(p.atMCtx, metrics.DagStorePRAtCacheFillCount.M(1)) + return n, nil + } + + readOff := off + filled + readSize := int64(len(b)) - filled + + smallRead := readSize < MinRandomReadSize + + if smallRead { + // read into small read buf + readBuf := make([]byte, MinRandomReadSize) + bn, err := p.readInto(readBuf, readOff) + if err != nil && err != io.EOF { + return int(filled), err + } + + _ = stats.RecordWithTags(p.atMCtx, []tag.Mutator{tag.Insert(metrics.PRReadSize, "small")}, metrics.DagStorePRAtReadBytes.M(int64(bn)), metrics.DagStorePRAtReadCount.M(1)) + + // reslice so that the slice is the data + readBuf = readBuf[:bn] + + // fill user data + used := copy(b[filled:], readBuf[:]) + filled += int64(used) + readBuf = readBuf[used:] + + // cache the rest + if len(readBuf) > 0 { + p.remReads.Add(readOff+int64(used), readBuf) + } + } else { + // read into user buf + bn, err := p.readInto(b[filled:], readOff) + if err != nil { + return int(filled), err + } + filled += int64(bn) + + _ = stats.RecordWithTags(p.atMCtx, []tag.Mutator{tag.Insert(metrics.PRReadSize, "big")}, metrics.DagStorePRAtReadBytes.M(int64(bn)), metrics.DagStorePRAtReadCount.M(1)) + } + + if filled < int64(len(b)) { + return int(filled), io.EOF + } + + return int(filled), nil +} + +func (p *pieceReader) readInto(b []byte, off int64) (n int, err error) { + rd, err := p.getReader(uint64(off), uint64(len(b))) + if err != nil { + return 0, xerrors.Errorf("getting reader: %w", err) + } + + n, err = io.ReadFull(rd, b) + + cerr := rd.Close() + + if err == io.ErrUnexpectedEOF { + err = io.EOF + } + + if err != nil { + return n, err + } + + return n, cerr +} + var _ mount.Reader = (*pieceReader)(nil) diff --git a/storage/sealer/storiface/ffi.go b/storage/sealer/storiface/ffi.go index 9696b29db..4a9f832b8 100644 --- a/storage/sealer/storiface/ffi.go +++ b/storage/sealer/storiface/ffi.go @@ -8,6 +8,8 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/lotus/storage/sealer/fr32" ) var ErrSectorNotFound = errors.New("sector not found") @@ -26,6 +28,14 @@ func (i UnpaddedByteIndex) Valid() error { return nil } +func UnpaddedFloor(n uint64) UnpaddedByteIndex { + return UnpaddedByteIndex(n / uint64(fr32.UnpaddedFr32Chunk) * uint64(fr32.UnpaddedFr32Chunk)) +} + +func UnpaddedCeil(n uint64) UnpaddedByteIndex { + return UnpaddedByteIndex((n + uint64(fr32.UnpaddedFr32Chunk-1)) / uint64(fr32.UnpaddedFr32Chunk) * uint64(fr32.UnpaddedFr32Chunk)) +} + type PaddedByteIndex uint64 type RGetter func(ctx context.Context, id abi.SectorID) (sealed cid.Cid, update bool, err error)