206 lines
5.9 KiB
Go
206 lines
5.9 KiB
Go
package exchange
|
|
|
|
import (
|
|
"time"
|
|
|
|
"github.com/filecoin-project/lotus/build"
|
|
"github.com/filecoin-project/lotus/chain/store"
|
|
|
|
"github.com/ipfs/go-cid"
|
|
logging "github.com/ipfs/go-log"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/filecoin-project/lotus/chain/types"
|
|
)
|
|
|
|
var log = logging.Logger("chainxchg")
|
|
|
|
const (
|
|
// BlockSyncProtocolID is the protocol ID of the former blocksync protocol.
|
|
// Deprecated.
|
|
BlockSyncProtocolID = "/fil/sync/blk/0.0.1"
|
|
|
|
// ChainExchangeProtocolID is the protocol ID of the chain exchange
|
|
// protocol.
|
|
ChainExchangeProtocolID = "/fil/chain/xchg/0.0.1"
|
|
)
|
|
|
|
// FIXME: Bumped from original 800 to this to accommodate `syncFork()`
|
|
// use of `GetBlocks()`. It seems the expectation of that API is to
|
|
// fetch any amount of blocks leaving it to the internal logic here
|
|
// to partition and reassemble the requests if they go above the maximum.
|
|
// (Also as a consequence of this temporarily removing the `const`
|
|
// qualifier to avoid "const initializer [...] is not a constant" error.)
|
|
var MaxRequestLength = uint64(build.ForkLengthThreshold)
|
|
|
|
const (
|
|
// Extracted constants from the code.
|
|
// FIXME: Should be reviewed and confirmed.
|
|
SuccessPeerTagValue = 25
|
|
WriteReqDeadline = 5 * time.Second
|
|
ReadResDeadline = WriteReqDeadline
|
|
ReadResMinSpeed = 50 << 10
|
|
ShufflePeersPrefix = 5
|
|
WriteResDeadline = 60 * time.Second
|
|
)
|
|
|
|
// FIXME: Rename. Make private.
|
|
type Request struct {
|
|
// List of ordered CIDs comprising a `TipSetKey` from where to start
|
|
// fetching backwards.
|
|
// FIXME: Consider using `TipSetKey` now (introduced after the creation
|
|
// of this protocol) instead of converting back and forth.
|
|
Head []cid.Cid
|
|
// Number of block sets to fetch from `Head` (inclusive, should always
|
|
// be in the range `[1, MaxRequestLength]`).
|
|
Length uint64
|
|
// Request options, see `Options` type for more details. Compressed
|
|
// in a single `uint64` to save space.
|
|
Options uint64
|
|
}
|
|
|
|
// `Request` processed and validated to query the tipsets needed.
|
|
type validatedRequest struct {
|
|
head types.TipSetKey
|
|
length uint64
|
|
options *parsedOptions
|
|
}
|
|
|
|
// Request options. When fetching the chain segment we can fetch
|
|
// either block headers, messages, or both.
|
|
const (
|
|
Headers = 1 << iota
|
|
Messages
|
|
)
|
|
|
|
// Decompressed options into separate struct members for easy access
|
|
// during internal processing..
|
|
type parsedOptions struct {
|
|
IncludeHeaders bool
|
|
IncludeMessages bool
|
|
}
|
|
|
|
func (options *parsedOptions) noOptionsSet() bool {
|
|
return options.IncludeHeaders == false &&
|
|
options.IncludeMessages == false
|
|
}
|
|
|
|
func parseOptions(optfield uint64) *parsedOptions {
|
|
return &parsedOptions{
|
|
IncludeHeaders: optfield&(uint64(Headers)) != 0,
|
|
IncludeMessages: optfield&(uint64(Messages)) != 0,
|
|
}
|
|
}
|
|
|
|
// FIXME: Rename. Make private.
|
|
type Response struct {
|
|
Status status
|
|
// String that complements the error status when converting to an
|
|
// internal error (see `statusToError()`).
|
|
ErrorMessage string
|
|
|
|
Chain []*BSTipSet
|
|
}
|
|
|
|
type status uint64
|
|
|
|
const (
|
|
Ok status = 0
|
|
// We could not fetch all blocks requested (but at least we returned
|
|
// the `Head` requested). Not considered an error.
|
|
Partial = 101
|
|
|
|
// Errors
|
|
NotFound = 201
|
|
GoAway = 202
|
|
InternalError = 203
|
|
BadRequest = 204
|
|
)
|
|
|
|
// Convert status to internal error.
|
|
func (res *Response) statusToError() error {
|
|
switch res.Status {
|
|
case Ok, Partial:
|
|
return nil
|
|
// FIXME: Consider if we want to not process `Partial` responses
|
|
// and return an error instead.
|
|
case NotFound:
|
|
return xerrors.Errorf("not found")
|
|
case GoAway:
|
|
return xerrors.Errorf("not handling 'go away' chainxchg responses yet")
|
|
case InternalError:
|
|
return xerrors.Errorf("block sync peer errored: %s", res.ErrorMessage)
|
|
case BadRequest:
|
|
return xerrors.Errorf("block sync request invalid: %s", res.ErrorMessage)
|
|
default:
|
|
return xerrors.Errorf("unrecognized response code: %d", res.Status)
|
|
}
|
|
}
|
|
|
|
// FIXME: Rename.
|
|
type BSTipSet struct {
|
|
Blocks []*types.BlockHeader
|
|
Messages *CompactedMessages
|
|
}
|
|
|
|
// All messages of a single tipset compacted together instead
|
|
// of grouped by block to save space, since there are normally
|
|
// many repeated messages per tipset in different blocks.
|
|
//
|
|
// `BlsIncludes`/`SecpkIncludes` matches `Bls`/`Secpk` messages
|
|
// to blocks in the tipsets with the format:
|
|
// `BlsIncludes[BI][MI]`
|
|
// * BI: block index in the tipset.
|
|
// * MI: message index in `Bls` list
|
|
//
|
|
// FIXME: The logic to decompress this structure should belong
|
|
// to itself, not to the consumer.
|
|
type CompactedMessages struct {
|
|
Bls []*types.Message
|
|
BlsIncludes [][]uint64
|
|
|
|
Secpk []*types.SignedMessage
|
|
SecpkIncludes [][]uint64
|
|
}
|
|
|
|
// Response that has been validated according to the protocol
|
|
// and can be safely accessed.
|
|
type validatedResponse struct {
|
|
tipsets []*types.TipSet
|
|
// List of all messages per tipset (grouped by tipset,
|
|
// not by block, hence a single index like `tipsets`).
|
|
messages []*CompactedMessages
|
|
}
|
|
|
|
// Decompress messages and form full tipsets with them. The headers
|
|
// need to have been requested as well.
|
|
func (res *validatedResponse) toFullTipSets() []*store.FullTipSet {
|
|
if len(res.tipsets) == 0 || len(res.tipsets) != len(res.messages) {
|
|
// This decompression can only be done if both headers and
|
|
// messages are returned in the response. (The second check
|
|
// is already implied by the guarantees of `validatedResponse`,
|
|
// added here just for completeness.)
|
|
return nil
|
|
}
|
|
ftsList := make([]*store.FullTipSet, len(res.tipsets))
|
|
for tipsetIdx := range res.tipsets {
|
|
fts := &store.FullTipSet{} // FIXME: We should use the `NewFullTipSet` API.
|
|
msgs := res.messages[tipsetIdx]
|
|
for blockIdx, b := range res.tipsets[tipsetIdx].Blocks() {
|
|
fb := &types.FullBlock{
|
|
Header: b,
|
|
}
|
|
for _, mi := range msgs.BlsIncludes[blockIdx] {
|
|
fb.BlsMessages = append(fb.BlsMessages, msgs.Bls[mi])
|
|
}
|
|
for _, mi := range msgs.SecpkIncludes[blockIdx] {
|
|
fb.SecpkMessages = append(fb.SecpkMessages, msgs.Secpk[mi])
|
|
}
|
|
|
|
fts.Blocks = append(fts.Blocks, fb)
|
|
}
|
|
ftsList[tipsetIdx] = fts
|
|
}
|
|
return ftsList
|
|
}
|