lotus/chain/exchange/protocol.go

208 lines
6.0 KiB
Go
Raw Normal View History

package exchange
2020-07-27 15:31:36 +00:00
import (
"time"
2020-07-30 20:23:42 +00:00
"github.com/filecoin-project/lotus/build"
2020-07-27 15:31:36 +00:00
"github.com/filecoin-project/lotus/chain/store"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/chain/types"
)
var log = logging.Logger("chainxchg")
2020-07-27 15:31:36 +00:00
const (
// BlockSyncProtocolID is the protocol ID of the former blocksync protocol.
// Deprecated.
BlockSyncProtocolID = "/fil/sync/blk/0.0.1"
// ChainExchangeProtocolID is the protocol ID of the chain exchange
// protocol.
ChainExchangeProtocolID = "/fil/chain/xchg/0.0.1"
)
2020-07-27 15:31:36 +00:00
2020-07-30 20:23:42 +00:00
// FIXME: Bumped from original 800 to this to accommodate `syncFork()`
// use of `GetBlocks()`. It seems the expectation of that API is to
// fetch any amount of blocks leaving it to the internal logic here
// to partition and reassemble the requests if they go above the maximum.
// (Also as a consequence of this temporarily removing the `const`
// qualifier to avoid "const initializer [...] is not a constant" error.)
var MaxRequestLength = uint64(build.ForkLengthThreshold)
2020-07-27 15:31:36 +00:00
const (
// Extracted constants from the code.
// FIXME: Should be reviewed and confirmed.
SuccessPeerTagValue = 25
WriteReqDeadline = 5 * time.Second
ReadResDeadline = WriteReqDeadline
ReadResMinSpeed = 50 << 10
ShufflePeersPrefix = 16
WriteResDeadline = 60 * time.Second
)
2020-07-27 15:31:36 +00:00
// FIXME: Rename. Make private.
type Request struct {
// List of ordered CIDs comprising a `TipSetKey` from where to start
// fetching backwards.
2020-08-05 22:45:10 +00:00
// FIXME: Consider using `TipSetKey` now (introduced after the creation
// of this protocol) instead of converting back and forth.
2020-07-27 15:31:36 +00:00
Head []cid.Cid
// Number of block sets to fetch from `Head` (inclusive, should always
// be in the range `[1, MaxRequestLength]`).
Length uint64
// Request options, see `Options` type for more details. Compressed
// in a single `uint64` to save space.
Options uint64
}
// `Request` processed and validated to query the tipsets needed.
type validatedRequest struct {
head types.TipSetKey
length uint64
options *parsedOptions
}
// Request options. When fetching the chain segment we can fetch
// either block headers, messages, or both.
const (
Headers = 1 << iota
Messages
)
// Decompressed options into separate struct members for easy access
// during internal processing..
type parsedOptions struct {
2020-09-22 23:19:31 +00:00
IncludeHeaders bool
IncludeMessages bool
2020-07-27 15:31:36 +00:00
}
func (options *parsedOptions) noOptionsSet() bool {
return options.IncludeHeaders == false &&
options.IncludeMessages == false
}
func parseOptions(optfield uint64) *parsedOptions {
return &parsedOptions{
2020-09-22 23:19:31 +00:00
IncludeHeaders: optfield&(uint64(Headers)) != 0,
IncludeMessages: optfield&(uint64(Messages)) != 0,
2020-07-27 15:31:36 +00:00
}
}
// FIXME: Rename. Make private.
type Response struct {
2020-08-05 22:47:27 +00:00
Status status
2020-07-27 15:31:36 +00:00
// String that complements the error status when converting to an
// internal error (see `statusToError()`).
ErrorMessage string
Chain []*BSTipSet
}
type status uint64
2020-08-05 22:47:27 +00:00
2020-07-27 15:31:36 +00:00
const (
Ok status = 0
// We could not fetch all blocks requested (but at least we returned
// the `Head` requested). Not considered an error.
Partial = 101
// Errors
NotFound = 201
GoAway = 202
InternalError = 203
BadRequest = 204
)
// Convert status to internal error.
func (res *Response) statusToError() error {
switch res.Status {
case Ok, Partial:
return nil
// FIXME: Consider if we want to not process `Partial` responses
// and return an error instead.
case NotFound:
return xerrors.Errorf("not found")
case GoAway:
return xerrors.Errorf("not handling 'go away' chainxchg responses yet")
2020-07-27 15:31:36 +00:00
case InternalError:
return xerrors.Errorf("block sync peer errored: %s", res.ErrorMessage)
case BadRequest:
return xerrors.Errorf("block sync request invalid: %s", res.ErrorMessage)
default:
return xerrors.Errorf("unrecognized response code: %d", res.Status)
}
}
// FIXME: Rename.
type BSTipSet struct {
2020-09-22 23:19:31 +00:00
// List of blocks belonging to a single tipset to which the
// `CompactedMessages` are linked.
2020-08-05 22:47:27 +00:00
Blocks []*types.BlockHeader
Messages *CompactedMessages
2020-07-27 15:31:36 +00:00
}
2020-07-31 12:25:40 +00:00
// All messages of a single tipset compacted together instead
// of grouped by block to save space, since there are normally
// many repeated messages per tipset in different blocks.
//
// `BlsIncludes`/`SecpkIncludes` matches `Bls`/`Secpk` messages
// to blocks in the tipsets with the format:
// `BlsIncludes[BI][MI]`
// * BI: block index in the tipset.
// * MI: message index in `Bls` list
//
// FIXME: The logic to decompress this structure should belong
// to itself, not to the consumer.
2020-07-27 15:31:36 +00:00
type CompactedMessages struct {
2020-08-05 22:47:27 +00:00
Bls []*types.Message
2020-07-27 15:31:36 +00:00
BlsIncludes [][]uint64
2020-08-05 22:47:27 +00:00
Secpk []*types.SignedMessage
2020-07-27 15:31:36 +00:00
SecpkIncludes [][]uint64
}
// Response that has been validated according to the protocol
// and can be safely accessed.
2020-07-31 12:30:45 +00:00
type validatedResponse struct {
2020-08-05 22:47:27 +00:00
tipsets []*types.TipSet
2020-07-31 12:30:45 +00:00
// List of all messages per tipset (grouped by tipset,
// not by block, hence a single index like `tipsets`).
messages []*CompactedMessages
2020-07-27 15:31:36 +00:00
}
// Decompress messages and form full tipsets with them. The headers
// need to have been requested as well.
2020-08-05 22:47:27 +00:00
func (res *validatedResponse) toFullTipSets() []*store.FullTipSet {
2020-07-31 12:33:40 +00:00
if len(res.tipsets) == 0 || len(res.tipsets) != len(res.messages) {
2020-07-27 15:31:36 +00:00
// This decompression can only be done if both headers and
2020-07-31 12:33:40 +00:00
// messages are returned in the response. (The second check
// is already implied by the guarantees of `validatedResponse`,
// added here just for completeness.)
2020-07-27 15:31:36 +00:00
return nil
}
2020-07-31 12:30:45 +00:00
ftsList := make([]*store.FullTipSet, len(res.tipsets))
for tipsetIdx := range res.tipsets {
2020-07-27 15:31:36 +00:00
fts := &store.FullTipSet{} // FIXME: We should use the `NewFullTipSet` API.
2020-07-31 12:30:45 +00:00
msgs := res.messages[tipsetIdx]
for blockIdx, b := range res.tipsets[tipsetIdx].Blocks() {
2020-07-27 15:31:36 +00:00
fb := &types.FullBlock{
Header: b,
}
for _, mi := range msgs.BlsIncludes[blockIdx] {
fb.BlsMessages = append(fb.BlsMessages, msgs.Bls[mi])
}
for _, mi := range msgs.SecpkIncludes[blockIdx] {
fb.SecpkMessages = append(fb.SecpkMessages, msgs.Secpk[mi])
}
fts.Blocks = append(fts.Blocks, fb)
}
ftsList[tipsetIdx] = fts
}
return ftsList
}