lotus/chain/exchange/protocol.go
Steven Allen 2b771693eb fix: exchange: allow up to 10k messages per block
Also explicitly limit how many bytes we're willing to read in one go
such that we're capable of reading a worst-case tipset (like, really,
never going to happen worst-case). Previously, this wasn't an issue.
However, we've bumped the max number of messages from 8,192 to 150,000
and need to limit allocations somewhere else.
2023-12-09 21:19:55 +08:00

207 lines
6.0 KiB
Go

package exchange
import (
"time"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
)
var log = logging.Logger("chainxchg")
const (
// ChainExchangeProtocolID is the protocol ID of the chain exchange
// protocol.
ChainExchangeProtocolID = "/fil/chain/xchg/0.0.1"
)
// FIXME: Bumped from original 800 to this to accommodate `syncFork()`
//
// use of `GetBlocks()`. It seems the expectation of that API is to
// fetch any amount of blocks leaving it to the internal logic here
// to partition and reassemble the requests if they go above the maximum.
// (Also as a consequence of this temporarily removing the `const`
// qualifier to avoid "const initializer [...] is not a constant" error.)
var MaxRequestLength = uint64(build.ForkLengthThreshold)
const (
// Extracted constants from the code.
// FIXME: Should be reviewed and confirmed.
SuccessPeerTagValue = 25
WriteReqDeadline = 5 * time.Second
ReadResDeadline = WriteReqDeadline
ReadResMinSpeed = 50 << 10
ShufflePeersPrefix = 16
WriteResDeadline = 60 * time.Second
)
// FIXME: Rename. Make private.
type Request struct {
// List of ordered CIDs comprising a `TipSetKey` from where to start
// fetching backwards.
// FIXME: Consider using `TipSetKey` now (introduced after the creation
// of this protocol) instead of converting back and forth.
Head []cid.Cid
// Number of block sets to fetch from `Head` (inclusive, should always
// be in the range `[1, MaxRequestLength]`).
Length uint64
// Request options, see `Options` type for more details. Compressed
// in a single `uint64` to save space.
Options uint64
}
// `Request` processed and validated to query the tipsets needed.
type validatedRequest struct {
head types.TipSetKey
length uint64
options *parsedOptions
}
// Request options. When fetching the chain segment we can fetch
// either block headers, messages, or both.
const (
Headers = 1 << iota
Messages
)
// Decompressed options into separate struct members for easy access
// during internal processing..
type parsedOptions struct {
IncludeHeaders bool
IncludeMessages bool
}
func (options *parsedOptions) noOptionsSet() bool {
return options.IncludeHeaders == false &&
options.IncludeMessages == false
}
func parseOptions(optfield uint64) *parsedOptions {
return &parsedOptions{
IncludeHeaders: optfield&(uint64(Headers)) != 0,
IncludeMessages: optfield&(uint64(Messages)) != 0,
}
}
// FIXME: Rename. Make private.
type Response struct {
Status status
// String that complements the error status when converting to an
// internal error (see `statusToError()`).
ErrorMessage string
Chain []*BSTipSet
}
type status uint64
const (
Ok status = 0
// We could not fetch all blocks requested (but at least we returned
// the `Head` requested). Not considered an error.
Partial = 101
// Errors
NotFound = 201
GoAway = 202
InternalError = 203
BadRequest = 204
)
// Convert status to internal error.
func (res *Response) statusToError() error {
switch res.Status {
case Ok, Partial:
return nil
// FIXME: Consider if we want to not process `Partial` responses
// and return an error instead.
case NotFound:
return xerrors.Errorf("not found")
case GoAway:
return xerrors.Errorf("not handling 'go away' chainxchg responses yet")
case InternalError:
return xerrors.Errorf("block sync peer errored: %s", res.ErrorMessage)
case BadRequest:
return xerrors.Errorf("block sync request invalid: %s", res.ErrorMessage)
default:
return xerrors.Errorf("unrecognized response code: %d", res.Status)
}
}
// FIXME: Rename.
type BSTipSet struct {
// List of blocks belonging to a single tipset to which the
// `CompactedMessages` are linked.
Blocks []*types.BlockHeader
Messages *CompactedMessages
}
// All messages of a single tipset compacted together instead
// of grouped by block to save space, since there are normally
// many repeated messages per tipset in different blocks.
//
// `BlsIncludes`/`SecpkIncludes` matches `Bls`/`Secpk` messages
// to blocks in the tipsets with the format:
// `BlsIncludes[BI][MI]`
// - BI: block index in the tipset.
// - MI: message index in `Bls` list
//
// FIXME: The logic to decompress this structure should belong
//
// to itself, not to the consumer.
//
// NOTE: Max messages is: BlockMessageLimit (10k) * MaxTipsetSize (15) = 150k
type CompactedMessages struct {
Bls []*types.Message
BlsIncludes [][]uint64
Secpk []*types.SignedMessage
SecpkIncludes [][]uint64
}
// Response that has been validated according to the protocol
// and can be safely accessed.
type validatedResponse struct {
tipsets []*types.TipSet
// List of all messages per tipset (grouped by tipset,
// not by block, hence a single index like `tipsets`).
messages []*CompactedMessages
}
// Decompress messages and form full tipsets with them. The headers
// need to have been requested as well.
func (res *validatedResponse) toFullTipSets() []*store.FullTipSet {
if len(res.tipsets) == 0 || len(res.tipsets) != len(res.messages) {
// This decompression can only be done if both headers and
// messages are returned in the response. (The second check
// is already implied by the guarantees of `validatedResponse`,
// added here just for completeness.)
return nil
}
ftsList := make([]*store.FullTipSet, len(res.tipsets))
for tipsetIdx := range res.tipsets {
fts := &store.FullTipSet{} // FIXME: We should use the `NewFullTipSet` API.
msgs := res.messages[tipsetIdx]
for blockIdx, b := range res.tipsets[tipsetIdx].Blocks() {
fb := &types.FullBlock{
Header: b,
}
for _, mi := range msgs.BlsIncludes[blockIdx] {
fb.BlsMessages = append(fb.BlsMessages, msgs.Bls[mi])
}
for _, mi := range msgs.SecpkIncludes[blockIdx] {
fb.SecpkMessages = append(fb.SecpkMessages, msgs.Secpk[mi])
}
fts.Blocks = append(fts.Blocks, fb)
}
ftsList[tipsetIdx] = fts
}
return ftsList
}