986f240d91
Also explicitly limit how many bytes we're willing to read in one go such that we're capable of reading a worst-case tipset (like, really, never going to happen worst-case). Previously, this wasn't an issue. However, we've bumped the max number of messages from 8,192 to 150,000 and need to limit allocations somewhere else.
207 lines
6.0 KiB
Go
207 lines
6.0 KiB
Go
package exchange
|
|
|
|
import (
|
|
"time"
|
|
|
|
"github.com/ipfs/go-cid"
|
|
logging "github.com/ipfs/go-log/v2"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/filecoin-project/lotus/build"
|
|
"github.com/filecoin-project/lotus/chain/store"
|
|
"github.com/filecoin-project/lotus/chain/types"
|
|
)
|
|
|
|
var log = logging.Logger("chainxchg")
|
|
|
|
const (
|
|
// ChainExchangeProtocolID is the protocol ID of the chain exchange
|
|
// protocol.
|
|
ChainExchangeProtocolID = "/fil/chain/xchg/0.0.1"
|
|
)
|
|
|
|
// FIXME: Bumped from original 800 to this to accommodate `syncFork()`
|
|
//
|
|
// use of `GetBlocks()`. It seems the expectation of that API is to
|
|
// fetch any amount of blocks leaving it to the internal logic here
|
|
// to partition and reassemble the requests if they go above the maximum.
|
|
// (Also as a consequence of this temporarily removing the `const`
|
|
// qualifier to avoid "const initializer [...] is not a constant" error.)
|
|
var MaxRequestLength = uint64(build.ForkLengthThreshold)
|
|
|
|
const (
|
|
// Extracted constants from the code.
|
|
// FIXME: Should be reviewed and confirmed.
|
|
SuccessPeerTagValue = 25
|
|
WriteReqDeadline = 5 * time.Second
|
|
ReadResDeadline = WriteReqDeadline
|
|
ReadResMinSpeed = 50 << 10
|
|
ShufflePeersPrefix = 16
|
|
WriteResDeadline = 60 * time.Second
|
|
)
|
|
|
|
// FIXME: Rename. Make private.
|
|
type Request struct {
|
|
// List of ordered CIDs comprising a `TipSetKey` from where to start
|
|
// fetching backwards.
|
|
// FIXME: Consider using `TipSetKey` now (introduced after the creation
|
|
// of this protocol) instead of converting back and forth.
|
|
Head []cid.Cid
|
|
// Number of block sets to fetch from `Head` (inclusive, should always
|
|
// be in the range `[1, MaxRequestLength]`).
|
|
Length uint64
|
|
// Request options, see `Options` type for more details. Compressed
|
|
// in a single `uint64` to save space.
|
|
Options uint64
|
|
}
|
|
|
|
// `Request` processed and validated to query the tipsets needed.
|
|
type validatedRequest struct {
|
|
head types.TipSetKey
|
|
length uint64
|
|
options *parsedOptions
|
|
}
|
|
|
|
// Request options. When fetching the chain segment we can fetch
|
|
// either block headers, messages, or both.
|
|
const (
|
|
Headers = 1 << iota
|
|
Messages
|
|
)
|
|
|
|
// Decompressed options into separate struct members for easy access
|
|
// during internal processing..
|
|
type parsedOptions struct {
|
|
IncludeHeaders bool
|
|
IncludeMessages bool
|
|
}
|
|
|
|
func (options *parsedOptions) noOptionsSet() bool {
|
|
return options.IncludeHeaders == false &&
|
|
options.IncludeMessages == false
|
|
}
|
|
|
|
func parseOptions(optfield uint64) *parsedOptions {
|
|
return &parsedOptions{
|
|
IncludeHeaders: optfield&(uint64(Headers)) != 0,
|
|
IncludeMessages: optfield&(uint64(Messages)) != 0,
|
|
}
|
|
}
|
|
|
|
// FIXME: Rename. Make private.
|
|
type Response struct {
|
|
Status status
|
|
// String that complements the error status when converting to an
|
|
// internal error (see `statusToError()`).
|
|
ErrorMessage string
|
|
|
|
Chain []*BSTipSet
|
|
}
|
|
|
|
type status uint64
|
|
|
|
const (
|
|
Ok status = 0
|
|
// We could not fetch all blocks requested (but at least we returned
|
|
// the `Head` requested). Not considered an error.
|
|
Partial = 101
|
|
|
|
// Errors
|
|
NotFound = 201
|
|
GoAway = 202
|
|
InternalError = 203
|
|
BadRequest = 204
|
|
)
|
|
|
|
// Convert status to internal error.
|
|
func (res *Response) statusToError() error {
|
|
switch res.Status {
|
|
case Ok, Partial:
|
|
return nil
|
|
// FIXME: Consider if we want to not process `Partial` responses
|
|
// and return an error instead.
|
|
case NotFound:
|
|
return xerrors.Errorf("not found")
|
|
case GoAway:
|
|
return xerrors.Errorf("not handling 'go away' chainxchg responses yet")
|
|
case InternalError:
|
|
return xerrors.Errorf("block sync peer errored: %s", res.ErrorMessage)
|
|
case BadRequest:
|
|
return xerrors.Errorf("block sync request invalid: %s", res.ErrorMessage)
|
|
default:
|
|
return xerrors.Errorf("unrecognized response code: %d", res.Status)
|
|
}
|
|
}
|
|
|
|
// FIXME: Rename.
|
|
type BSTipSet struct {
|
|
// List of blocks belonging to a single tipset to which the
|
|
// `CompactedMessages` are linked.
|
|
Blocks []*types.BlockHeader
|
|
Messages *CompactedMessages
|
|
}
|
|
|
|
// All messages of a single tipset compacted together instead
|
|
// of grouped by block to save space, since there are normally
|
|
// many repeated messages per tipset in different blocks.
|
|
//
|
|
// `BlsIncludes`/`SecpkIncludes` matches `Bls`/`Secpk` messages
|
|
// to blocks in the tipsets with the format:
|
|
// `BlsIncludes[BI][MI]`
|
|
// - BI: block index in the tipset.
|
|
// - MI: message index in `Bls` list
|
|
//
|
|
// FIXME: The logic to decompress this structure should belong
|
|
//
|
|
// to itself, not to the consumer.
|
|
//
|
|
// NOTE: Max messages is: BlockMessageLimit (10k) * MaxTipsetSize (15) = 150k
|
|
type CompactedMessages struct {
|
|
Bls []*types.Message
|
|
BlsIncludes [][]uint64
|
|
|
|
Secpk []*types.SignedMessage
|
|
SecpkIncludes [][]uint64
|
|
}
|
|
|
|
// Response that has been validated according to the protocol
|
|
// and can be safely accessed.
|
|
type validatedResponse struct {
|
|
tipsets []*types.TipSet
|
|
// List of all messages per tipset (grouped by tipset,
|
|
// not by block, hence a single index like `tipsets`).
|
|
messages []*CompactedMessages
|
|
}
|
|
|
|
// Decompress messages and form full tipsets with them. The headers
|
|
// need to have been requested as well.
|
|
func (res *validatedResponse) toFullTipSets() []*store.FullTipSet {
|
|
if len(res.tipsets) == 0 || len(res.tipsets) != len(res.messages) {
|
|
// This decompression can only be done if both headers and
|
|
// messages are returned in the response. (The second check
|
|
// is already implied by the guarantees of `validatedResponse`,
|
|
// added here just for completeness.)
|
|
return nil
|
|
}
|
|
ftsList := make([]*store.FullTipSet, len(res.tipsets))
|
|
for tipsetIdx := range res.tipsets {
|
|
fts := &store.FullTipSet{} // FIXME: We should use the `NewFullTipSet` API.
|
|
msgs := res.messages[tipsetIdx]
|
|
for blockIdx, b := range res.tipsets[tipsetIdx].Blocks() {
|
|
fb := &types.FullBlock{
|
|
Header: b,
|
|
}
|
|
for _, mi := range msgs.BlsIncludes[blockIdx] {
|
|
fb.BlsMessages = append(fb.BlsMessages, msgs.Bls[mi])
|
|
}
|
|
for _, mi := range msgs.SecpkIncludes[blockIdx] {
|
|
fb.SecpkMessages = append(fb.SecpkMessages, msgs.Secpk[mi])
|
|
}
|
|
|
|
fts.Blocks = append(fts.Blocks, fb)
|
|
}
|
|
ftsList[tipsetIdx] = fts
|
|
}
|
|
return ftsList
|
|
}
|