Merge pull request #563 from filecoin-project/feat/sync-better
Don't try syncing to a chain if its not clearly heavier
This commit is contained in:
commit
53de5f8049
237
chain/blocksync/blocksync.go
Normal file
237
chain/blocksync/blocksync.go
Normal file
@ -0,0 +1,237 @@
|
|||||||
|
package blocksync
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/libp2p/go-libp2p-core/protocol"
|
||||||
|
"go.opencensus.io/trace"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
"github.com/filecoin-project/lotus/lib/cborutil"
|
||||||
|
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
cbor "github.com/ipfs/go-ipld-cbor"
|
||||||
|
logging "github.com/ipfs/go-log"
|
||||||
|
inet "github.com/libp2p/go-libp2p-core/network"
|
||||||
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
|
)
|
||||||
|
|
||||||
|
var log = logging.Logger("blocksync")
|
||||||
|
|
||||||
|
type NewStreamFunc func(context.Context, peer.ID, ...protocol.ID) (inet.Stream, error)
|
||||||
|
|
||||||
|
const BlockSyncProtocolID = "/fil/sync/blk/0.0.1"
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
cbor.RegisterCborType(BlockSyncRequest{})
|
||||||
|
cbor.RegisterCborType(BlockSyncResponse{})
|
||||||
|
cbor.RegisterCborType(BSTipSet{})
|
||||||
|
}
|
||||||
|
|
||||||
|
type BlockSyncService struct {
|
||||||
|
cs *store.ChainStore
|
||||||
|
}
|
||||||
|
|
||||||
|
type BlockSyncRequest struct {
|
||||||
|
Start []cid.Cid
|
||||||
|
RequestLength uint64
|
||||||
|
|
||||||
|
Options uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
type BSOptions struct {
|
||||||
|
IncludeBlocks bool
|
||||||
|
IncludeMessages bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func ParseBSOptions(optfield uint64) *BSOptions {
|
||||||
|
return &BSOptions{
|
||||||
|
IncludeBlocks: optfield&(BSOptBlocks) != 0,
|
||||||
|
IncludeMessages: optfield&(BSOptMessages) != 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
BSOptBlocks = 1 << 0
|
||||||
|
BSOptMessages = 1 << 1
|
||||||
|
)
|
||||||
|
|
||||||
|
type BlockSyncResponse struct {
|
||||||
|
Chain []*BSTipSet
|
||||||
|
|
||||||
|
Status uint64
|
||||||
|
Message string
|
||||||
|
}
|
||||||
|
|
||||||
|
type BSTipSet struct {
|
||||||
|
Blocks []*types.BlockHeader
|
||||||
|
|
||||||
|
BlsMessages []*types.Message
|
||||||
|
BlsMsgIncludes [][]uint64
|
||||||
|
|
||||||
|
SecpkMessages []*types.SignedMessage
|
||||||
|
SecpkMsgIncludes [][]uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewBlockSyncService(cs *store.ChainStore) *BlockSyncService {
|
||||||
|
return &BlockSyncService{
|
||||||
|
cs: cs,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bss *BlockSyncService) HandleStream(s inet.Stream) {
|
||||||
|
ctx, span := trace.StartSpan(context.Background(), "blocksync.HandleStream")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
defer s.Close()
|
||||||
|
|
||||||
|
var req BlockSyncRequest
|
||||||
|
if err := cborutil.ReadCborRPC(bufio.NewReader(s), &req); err != nil {
|
||||||
|
log.Errorf("failed to read block sync request: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.Infof("block sync request for: %s %d", req.Start, req.RequestLength)
|
||||||
|
|
||||||
|
resp, err := bss.processRequest(ctx, &req)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("failed to process block sync request: ", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := cborutil.WriteCborRPC(s, resp); err != nil {
|
||||||
|
log.Error("failed to write back response for handle stream: ", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bss *BlockSyncService) processRequest(ctx context.Context, req *BlockSyncRequest) (*BlockSyncResponse, error) {
|
||||||
|
ctx, span := trace.StartSpan(ctx, "blocksync.ProcessRequest")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
opts := ParseBSOptions(req.Options)
|
||||||
|
if len(req.Start) == 0 {
|
||||||
|
return &BlockSyncResponse{
|
||||||
|
Status: 204,
|
||||||
|
Message: "no cids given in blocksync request",
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
span.AddAttributes(
|
||||||
|
trace.BoolAttribute("blocks", opts.IncludeBlocks),
|
||||||
|
trace.BoolAttribute("messages", opts.IncludeMessages),
|
||||||
|
)
|
||||||
|
|
||||||
|
chain, err := bss.collectChainSegment(req.Start, req.RequestLength, opts)
|
||||||
|
if err != nil {
|
||||||
|
log.Warn("encountered error while responding to block sync request: ", err)
|
||||||
|
return &BlockSyncResponse{
|
||||||
|
Status: 203,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return &BlockSyncResponse{
|
||||||
|
Chain: chain,
|
||||||
|
Status: 0,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bss *BlockSyncService) collectChainSegment(start []cid.Cid, length uint64, opts *BSOptions) ([]*BSTipSet, error) {
|
||||||
|
var bstips []*BSTipSet
|
||||||
|
cur := start
|
||||||
|
for {
|
||||||
|
var bst BSTipSet
|
||||||
|
ts, err := bss.cs.LoadTipSet(cur)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("failed loading tipset %s: %w", cur, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if opts.IncludeMessages {
|
||||||
|
bmsgs, bmincl, smsgs, smincl, err := bss.gatherMessages(ts)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("gather messages failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
bst.BlsMessages = bmsgs
|
||||||
|
bst.BlsMsgIncludes = bmincl
|
||||||
|
bst.SecpkMessages = smsgs
|
||||||
|
bst.SecpkMsgIncludes = smincl
|
||||||
|
}
|
||||||
|
|
||||||
|
if opts.IncludeBlocks {
|
||||||
|
bst.Blocks = ts.Blocks()
|
||||||
|
}
|
||||||
|
|
||||||
|
bstips = append(bstips, &bst)
|
||||||
|
|
||||||
|
if uint64(len(bstips)) >= length || ts.Height() == 0 {
|
||||||
|
return bstips, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
cur = ts.Parents()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bss *BlockSyncService) gatherMessages(ts *types.TipSet) ([]*types.Message, [][]uint64, []*types.SignedMessage, [][]uint64, error) {
|
||||||
|
blsmsgmap := make(map[cid.Cid]uint64)
|
||||||
|
secpkmsgmap := make(map[cid.Cid]uint64)
|
||||||
|
var secpkmsgs []*types.SignedMessage
|
||||||
|
var blsmsgs []*types.Message
|
||||||
|
var secpkincl, blsincl [][]uint64
|
||||||
|
|
||||||
|
for _, b := range ts.Blocks() {
|
||||||
|
bmsgs, smsgs, err := bss.cs.MessagesForBlock(b)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
bmi := make([]uint64, 0, len(bmsgs))
|
||||||
|
for _, m := range bmsgs {
|
||||||
|
i, ok := blsmsgmap[m.Cid()]
|
||||||
|
if !ok {
|
||||||
|
i = uint64(len(blsmsgs))
|
||||||
|
blsmsgs = append(blsmsgs, m)
|
||||||
|
blsmsgmap[m.Cid()] = i
|
||||||
|
}
|
||||||
|
|
||||||
|
bmi = append(bmi, i)
|
||||||
|
}
|
||||||
|
blsincl = append(blsincl, bmi)
|
||||||
|
|
||||||
|
smi := make([]uint64, 0, len(smsgs))
|
||||||
|
for _, m := range smsgs {
|
||||||
|
i, ok := secpkmsgmap[m.Cid()]
|
||||||
|
if !ok {
|
||||||
|
i = uint64(len(secpkmsgs))
|
||||||
|
secpkmsgs = append(secpkmsgs, m)
|
||||||
|
secpkmsgmap[m.Cid()] = i
|
||||||
|
}
|
||||||
|
|
||||||
|
smi = append(smi, i)
|
||||||
|
}
|
||||||
|
secpkincl = append(secpkincl, smi)
|
||||||
|
}
|
||||||
|
|
||||||
|
return blsmsgs, blsincl, secpkmsgs, secpkincl, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func bstsToFullTipSet(bts *BSTipSet) (*store.FullTipSet, error) {
|
||||||
|
fts := &store.FullTipSet{}
|
||||||
|
for i, b := range bts.Blocks {
|
||||||
|
fb := &types.FullBlock{
|
||||||
|
Header: b,
|
||||||
|
}
|
||||||
|
for _, mi := range bts.BlsMsgIncludes[i] {
|
||||||
|
fb.BlsMessages = append(fb.BlsMessages, bts.BlsMessages[mi])
|
||||||
|
}
|
||||||
|
for _, mi := range bts.SecpkMsgIncludes[i] {
|
||||||
|
fb.SecpkMessages = append(fb.SecpkMessages, bts.SecpkMessages[mi])
|
||||||
|
}
|
||||||
|
|
||||||
|
fts.Blocks = append(fts.Blocks, fb)
|
||||||
|
}
|
||||||
|
|
||||||
|
return fts, nil
|
||||||
|
}
|
@ -1,15 +1,20 @@
|
|||||||
package chain
|
package blocksync
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
blocks "github.com/ipfs/go-block-format"
|
||||||
bserv "github.com/ipfs/go-blockservice"
|
bserv "github.com/ipfs/go-blockservice"
|
||||||
"github.com/libp2p/go-libp2p-core/host"
|
"github.com/ipfs/go-cid"
|
||||||
"github.com/libp2p/go-libp2p-core/protocol"
|
inet "github.com/libp2p/go-libp2p-core/network"
|
||||||
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
|
host "github.com/libp2p/go-libp2p-host"
|
||||||
"go.opencensus.io/trace"
|
"go.opencensus.io/trace"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
@ -17,236 +22,24 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
"github.com/filecoin-project/lotus/lib/cborutil"
|
"github.com/filecoin-project/lotus/lib/cborutil"
|
||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
|
|
||||||
blocks "github.com/ipfs/go-block-format"
|
|
||||||
"github.com/ipfs/go-cid"
|
|
||||||
cbor "github.com/ipfs/go-ipld-cbor"
|
|
||||||
inet "github.com/libp2p/go-libp2p-core/network"
|
|
||||||
"github.com/libp2p/go-libp2p-core/peer"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type NewStreamFunc func(context.Context, peer.ID, ...protocol.ID) (inet.Stream, error)
|
|
||||||
|
|
||||||
const BlockSyncProtocolID = "/fil/sync/blk/0.0.1"
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
cbor.RegisterCborType(BlockSyncRequest{})
|
|
||||||
cbor.RegisterCborType(BlockSyncResponse{})
|
|
||||||
cbor.RegisterCborType(BSTipSet{})
|
|
||||||
}
|
|
||||||
|
|
||||||
type BlockSyncService struct {
|
|
||||||
cs *store.ChainStore
|
|
||||||
}
|
|
||||||
|
|
||||||
type BlockSyncRequest struct {
|
|
||||||
Start []cid.Cid
|
|
||||||
RequestLength uint64
|
|
||||||
|
|
||||||
Options uint64
|
|
||||||
}
|
|
||||||
|
|
||||||
type BSOptions struct {
|
|
||||||
IncludeBlocks bool
|
|
||||||
IncludeMessages bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func ParseBSOptions(optfield uint64) *BSOptions {
|
|
||||||
return &BSOptions{
|
|
||||||
IncludeBlocks: optfield&(BSOptBlocks) != 0,
|
|
||||||
IncludeMessages: optfield&(BSOptMessages) != 0,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const (
|
|
||||||
BSOptBlocks = 1 << 0
|
|
||||||
BSOptMessages = 1 << 1
|
|
||||||
)
|
|
||||||
|
|
||||||
type BlockSyncResponse struct {
|
|
||||||
Chain []*BSTipSet
|
|
||||||
|
|
||||||
Status uint64
|
|
||||||
Message string
|
|
||||||
}
|
|
||||||
|
|
||||||
type BSTipSet struct {
|
|
||||||
Blocks []*types.BlockHeader
|
|
||||||
|
|
||||||
BlsMessages []*types.Message
|
|
||||||
BlsMsgIncludes [][]uint64
|
|
||||||
|
|
||||||
SecpkMessages []*types.SignedMessage
|
|
||||||
SecpkMsgIncludes [][]uint64
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewBlockSyncService(cs *store.ChainStore) *BlockSyncService {
|
|
||||||
return &BlockSyncService{
|
|
||||||
cs: cs,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bss *BlockSyncService) HandleStream(s inet.Stream) {
|
|
||||||
ctx, span := trace.StartSpan(context.Background(), "blocksync.HandleStream")
|
|
||||||
defer span.End()
|
|
||||||
|
|
||||||
defer s.Close()
|
|
||||||
|
|
||||||
var req BlockSyncRequest
|
|
||||||
if err := cborutil.ReadCborRPC(bufio.NewReader(s), &req); err != nil {
|
|
||||||
log.Errorf("failed to read block sync request: %s", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
log.Infof("block sync request for: %s %d", req.Start, req.RequestLength)
|
|
||||||
|
|
||||||
resp, err := bss.processRequest(ctx, &req)
|
|
||||||
if err != nil {
|
|
||||||
log.Error("failed to process block sync request: ", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := cborutil.WriteCborRPC(s, resp); err != nil {
|
|
||||||
log.Error("failed to write back response for handle stream: ", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bss *BlockSyncService) processRequest(ctx context.Context, req *BlockSyncRequest) (*BlockSyncResponse, error) {
|
|
||||||
ctx, span := trace.StartSpan(ctx, "blocksync.ProcessRequest")
|
|
||||||
defer span.End()
|
|
||||||
|
|
||||||
opts := ParseBSOptions(req.Options)
|
|
||||||
if len(req.Start) == 0 {
|
|
||||||
return &BlockSyncResponse{
|
|
||||||
Status: 204,
|
|
||||||
Message: "no cids given in blocksync request",
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
span.AddAttributes(
|
|
||||||
trace.BoolAttribute("blocks", opts.IncludeBlocks),
|
|
||||||
trace.BoolAttribute("messages", opts.IncludeMessages),
|
|
||||||
)
|
|
||||||
|
|
||||||
chain, err := bss.collectChainSegment(req.Start, req.RequestLength, opts)
|
|
||||||
if err != nil {
|
|
||||||
log.Error("encountered error while responding to block sync request: ", err)
|
|
||||||
return &BlockSyncResponse{
|
|
||||||
Status: 203,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return &BlockSyncResponse{
|
|
||||||
Chain: chain,
|
|
||||||
Status: 0,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bss *BlockSyncService) collectChainSegment(start []cid.Cid, length uint64, opts *BSOptions) ([]*BSTipSet, error) {
|
|
||||||
var bstips []*BSTipSet
|
|
||||||
cur := start
|
|
||||||
for {
|
|
||||||
var bst BSTipSet
|
|
||||||
ts, err := bss.cs.LoadTipSet(cur)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if opts.IncludeMessages {
|
|
||||||
bmsgs, bmincl, smsgs, smincl, err := bss.gatherMessages(ts)
|
|
||||||
if err != nil {
|
|
||||||
return nil, xerrors.Errorf("gather messages failed: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
bst.BlsMessages = bmsgs
|
|
||||||
bst.BlsMsgIncludes = bmincl
|
|
||||||
bst.SecpkMessages = smsgs
|
|
||||||
bst.SecpkMsgIncludes = smincl
|
|
||||||
}
|
|
||||||
|
|
||||||
if opts.IncludeBlocks {
|
|
||||||
bst.Blocks = ts.Blocks()
|
|
||||||
}
|
|
||||||
|
|
||||||
bstips = append(bstips, &bst)
|
|
||||||
|
|
||||||
if uint64(len(bstips)) >= length || ts.Height() == 0 {
|
|
||||||
return bstips, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
cur = ts.Parents()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bss *BlockSyncService) gatherMessages(ts *types.TipSet) ([]*types.Message, [][]uint64, []*types.SignedMessage, [][]uint64, error) {
|
|
||||||
blsmsgmap := make(map[cid.Cid]uint64)
|
|
||||||
secpkmsgmap := make(map[cid.Cid]uint64)
|
|
||||||
var secpkmsgs []*types.SignedMessage
|
|
||||||
var blsmsgs []*types.Message
|
|
||||||
var secpkincl, blsincl [][]uint64
|
|
||||||
|
|
||||||
for _, b := range ts.Blocks() {
|
|
||||||
bmsgs, smsgs, err := bss.cs.MessagesForBlock(b)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, nil, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
bmi := make([]uint64, 0, len(bmsgs))
|
|
||||||
for _, m := range bmsgs {
|
|
||||||
i, ok := blsmsgmap[m.Cid()]
|
|
||||||
if !ok {
|
|
||||||
i = uint64(len(blsmsgs))
|
|
||||||
blsmsgs = append(blsmsgs, m)
|
|
||||||
blsmsgmap[m.Cid()] = i
|
|
||||||
}
|
|
||||||
|
|
||||||
bmi = append(bmi, i)
|
|
||||||
}
|
|
||||||
blsincl = append(blsincl, bmi)
|
|
||||||
|
|
||||||
smi := make([]uint64, 0, len(smsgs))
|
|
||||||
for _, m := range smsgs {
|
|
||||||
i, ok := secpkmsgmap[m.Cid()]
|
|
||||||
if !ok {
|
|
||||||
i = uint64(len(secpkmsgs))
|
|
||||||
secpkmsgs = append(secpkmsgs, m)
|
|
||||||
secpkmsgmap[m.Cid()] = i
|
|
||||||
}
|
|
||||||
|
|
||||||
smi = append(smi, i)
|
|
||||||
}
|
|
||||||
secpkincl = append(secpkincl, smi)
|
|
||||||
}
|
|
||||||
|
|
||||||
return blsmsgs, blsincl, secpkmsgs, secpkincl, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type BlockSync struct {
|
type BlockSync struct {
|
||||||
bserv bserv.BlockService
|
bserv bserv.BlockService
|
||||||
newStream NewStreamFunc
|
host host.Host
|
||||||
|
|
||||||
syncPeersLk sync.Mutex
|
syncPeersLk sync.Mutex
|
||||||
syncPeers map[peer.ID]struct{}
|
syncPeers *bsPeerTracker
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBlockSyncClient(bserv dtypes.ChainBlockService, h host.Host) *BlockSync {
|
func NewBlockSyncClient(bserv dtypes.ChainBlockService, h host.Host) *BlockSync {
|
||||||
return &BlockSync{
|
return &BlockSync{
|
||||||
bserv: bserv,
|
bserv: bserv,
|
||||||
newStream: h.NewStream,
|
host: h,
|
||||||
syncPeers: make(map[peer.ID]struct{}),
|
syncPeers: newPeerTracker(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bs *BlockSync) getPeers() []peer.ID {
|
|
||||||
bs.syncPeersLk.Lock()
|
|
||||||
defer bs.syncPeersLk.Unlock()
|
|
||||||
var out []peer.ID
|
|
||||||
for p := range bs.syncPeers {
|
|
||||||
out = append(out, p)
|
|
||||||
}
|
|
||||||
return out
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bs *BlockSync) processStatus(req *BlockSyncRequest, res *BlockSyncResponse) error {
|
func (bs *BlockSync) processStatus(req *BlockSyncRequest, res *BlockSyncResponse) error {
|
||||||
switch res.Status {
|
switch res.Status {
|
||||||
case 101: // Partial Response
|
case 101: // Partial Response
|
||||||
@ -274,22 +67,20 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tipset []cid.Cid, count int)
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
peers := bs.getPeers()
|
|
||||||
perm := rand.Perm(len(peers))
|
|
||||||
// TODO: round robin through these peers on error
|
|
||||||
|
|
||||||
req := &BlockSyncRequest{
|
req := &BlockSyncRequest{
|
||||||
Start: tipset,
|
Start: tipset,
|
||||||
RequestLength: uint64(count),
|
RequestLength: uint64(count),
|
||||||
Options: BSOptBlocks,
|
Options: BSOptBlocks,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
peers := bs.getPeers()
|
||||||
|
|
||||||
var oerr error
|
var oerr error
|
||||||
for _, p := range perm {
|
for _, p := range peers {
|
||||||
res, err := bs.sendRequestToPeer(ctx, peers[p], req)
|
res, err := bs.sendRequestToPeer(ctx, p, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
oerr = err
|
oerr = err
|
||||||
log.Warnf("BlockSync request failed for peer %s: %s", peers[p].String(), err)
|
log.Warnf("BlockSync request failed for peer %s: %s", p.String(), err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -298,7 +89,7 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tipset []cid.Cid, count int)
|
|||||||
}
|
}
|
||||||
oerr = bs.processStatus(req, res)
|
oerr = bs.processStatus(req, res)
|
||||||
if oerr != nil {
|
if oerr != nil {
|
||||||
log.Warnf("BlockSync peer %s response was an error: %s", peers[p].String(), oerr)
|
log.Warnf("BlockSync peer %s response was an error: %s", p.String(), oerr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil, xerrors.Errorf("GetBlocks failed with all peers: %w", oerr)
|
return nil, xerrors.Errorf("GetBlocks failed with all peers: %w", oerr)
|
||||||
@ -327,11 +118,11 @@ func (bs *BlockSync) GetFullTipSet(ctx context.Context, p peer.ID, h []cid.Cid)
|
|||||||
|
|
||||||
return bstsToFullTipSet(bts)
|
return bstsToFullTipSet(bts)
|
||||||
case 101: // Partial Response
|
case 101: // Partial Response
|
||||||
panic("not handled")
|
return nil, xerrors.Errorf("partial responses are not handled")
|
||||||
case 201: // req.Start not found
|
case 201: // req.Start not found
|
||||||
return nil, fmt.Errorf("not found")
|
return nil, fmt.Errorf("not found")
|
||||||
case 202: // Go Away
|
case 202: // Go Away
|
||||||
panic("not handled")
|
return nil, xerrors.Errorf("received 'go away' response peer")
|
||||||
case 203: // Internal Error
|
case 203: // Internal Error
|
||||||
return nil, fmt.Errorf("block sync peer errored: %q", res.Message)
|
return nil, fmt.Errorf("block sync peer errored: %q", res.Message)
|
||||||
case 204: // Invalid Request
|
case 204: // Invalid Request
|
||||||
@ -376,29 +167,20 @@ func (bs *BlockSync) GetChainMessages(ctx context.Context, h *types.TipSet, coun
|
|||||||
return nil, xerrors.Errorf("GetChainMessages failed with all peers(%d): %w", len(peers), err)
|
return nil, xerrors.Errorf("GetChainMessages failed with all peers(%d): %w", len(peers), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func bstsToFullTipSet(bts *BSTipSet) (*store.FullTipSet, error) {
|
|
||||||
fts := &store.FullTipSet{}
|
|
||||||
for i, b := range bts.Blocks {
|
|
||||||
fb := &types.FullBlock{
|
|
||||||
Header: b,
|
|
||||||
}
|
|
||||||
for _, mi := range bts.BlsMsgIncludes[i] {
|
|
||||||
fb.BlsMessages = append(fb.BlsMessages, bts.BlsMessages[mi])
|
|
||||||
}
|
|
||||||
for _, mi := range bts.SecpkMsgIncludes[i] {
|
|
||||||
fb.SecpkMessages = append(fb.SecpkMessages, bts.SecpkMessages[mi])
|
|
||||||
}
|
|
||||||
|
|
||||||
fts.Blocks = append(fts.Blocks, fb)
|
|
||||||
}
|
|
||||||
|
|
||||||
return fts, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bs *BlockSync) sendRequestToPeer(ctx context.Context, p peer.ID, req *BlockSyncRequest) (*BlockSyncResponse, error) {
|
func (bs *BlockSync) sendRequestToPeer(ctx context.Context, p peer.ID, req *BlockSyncRequest) (*BlockSyncResponse, error) {
|
||||||
s, err := bs.newStream(inet.WithNoDial(ctx, "should already have connection"), p, BlockSyncProtocolID)
|
ctx, span := trace.StartSpan(ctx, "sendRequestToPeer")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
if span.IsRecordingEvents() {
|
||||||
|
span.AddAttributes(
|
||||||
|
trace.StringAttribute("peer", p.Pretty()),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
s, err := bs.host.NewStream(inet.WithNoDial(ctx, "should already have connection"), p, BlockSyncProtocolID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
bs.RemovePeer(p)
|
||||||
|
return nil, xerrors.Errorf("failed to open stream to peer: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := cborutil.WriteCborRPC(s, req); err != nil {
|
if err := cborutil.WriteCborRPC(s, req); err != nil {
|
||||||
@ -447,9 +229,15 @@ func (bs *BlockSync) GetBlock(ctx context.Context, c cid.Cid) (*types.BlockHeade
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (bs *BlockSync) AddPeer(p peer.ID) {
|
func (bs *BlockSync) AddPeer(p peer.ID) {
|
||||||
bs.syncPeersLk.Lock()
|
bs.syncPeers.addPeer(p)
|
||||||
defer bs.syncPeersLk.Unlock()
|
}
|
||||||
bs.syncPeers[p] = struct{}{}
|
|
||||||
|
func (bs *BlockSync) RemovePeer(p peer.ID) {
|
||||||
|
bs.syncPeers.removePeer(p)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bs *BlockSync) getPeers() []peer.ID {
|
||||||
|
return bs.syncPeers.prefSortedPeers()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bs *BlockSync) FetchMessagesByCids(ctx context.Context, cids []cid.Cid) ([]*types.Message, error) {
|
func (bs *BlockSync) FetchMessagesByCids(ctx context.Context, cids []cid.Cid) ([]*types.Message, error) {
|
||||||
@ -528,3 +316,83 @@ func (bs *BlockSync) fetchCids(ctx context.Context, cids []cid.Cid, cb func(int,
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type peerStats struct {
|
||||||
|
successes int
|
||||||
|
failures int
|
||||||
|
firstSeen time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
type bsPeerTracker struct {
|
||||||
|
peers map[peer.ID]*peerStats
|
||||||
|
lk sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPeerTracker() *bsPeerTracker {
|
||||||
|
return &bsPeerTracker{
|
||||||
|
peers: make(map[peer.ID]*peerStats),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func (bpt *bsPeerTracker) addPeer(p peer.ID) {
|
||||||
|
bpt.lk.Lock()
|
||||||
|
defer bpt.lk.Unlock()
|
||||||
|
if _, ok := bpt.peers[p]; ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
bpt.peers[p] = &peerStats{
|
||||||
|
firstSeen: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bpt *bsPeerTracker) prefSortedPeers() []peer.ID {
|
||||||
|
// TODO: this could probably be cached, but as long as its not too many peers, fine for now
|
||||||
|
bpt.lk.Lock()
|
||||||
|
defer bpt.lk.Unlock()
|
||||||
|
out := make([]peer.ID, 0, len(bpt.peers))
|
||||||
|
for p := range bpt.peers {
|
||||||
|
out = append(out, p)
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Slice(out, func(i, j int) bool {
|
||||||
|
pi := bpt.peers[out[i]]
|
||||||
|
pj := bpt.peers[out[j]]
|
||||||
|
if pi.successes > pj.successes {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if pi.failures < pj.successes {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return pi.firstSeen.Before(pj.firstSeen)
|
||||||
|
})
|
||||||
|
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bpt *bsPeerTracker) logSuccess(p peer.ID) {
|
||||||
|
bpt.lk.Lock()
|
||||||
|
defer bpt.lk.Unlock()
|
||||||
|
if pi, ok := bpt.peers[p]; !ok {
|
||||||
|
log.Warn("log success called on peer not in tracker")
|
||||||
|
return
|
||||||
|
} else {
|
||||||
|
pi.successes++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bpt *bsPeerTracker) logFailure(p peer.ID) {
|
||||||
|
bpt.lk.Lock()
|
||||||
|
defer bpt.lk.Unlock()
|
||||||
|
if pi, ok := bpt.peers[p]; !ok {
|
||||||
|
log.Warn("log failure called on peer not in tracker")
|
||||||
|
return
|
||||||
|
} else {
|
||||||
|
pi.failures++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bpt *bsPeerTracker) removePeer(p peer.ID) {
|
||||||
|
bpt.lk.Lock()
|
||||||
|
defer bpt.lk.Unlock()
|
||||||
|
delete(bpt.peers, p)
|
||||||
|
}
|
@ -1,11 +1,11 @@
|
|||||||
package chain
|
package blocksync
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
cid "github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
cbg "github.com/whyrusleeping/cbor-gen"
|
cbg "github.com/whyrusleeping/cbor-gen"
|
||||||
xerrors "golang.org/x/xerrors"
|
xerrors "golang.org/x/xerrors"
|
||||||
)
|
)
|
||||||
@ -15,11 +15,15 @@ import (
|
|||||||
var _ = xerrors.Errorf
|
var _ = xerrors.Errorf
|
||||||
|
|
||||||
func (t *BlockSyncRequest) MarshalCBOR(w io.Writer) error {
|
func (t *BlockSyncRequest) MarshalCBOR(w io.Writer) error {
|
||||||
|
if t == nil {
|
||||||
|
_, err := w.Write(cbg.CborNull)
|
||||||
|
return err
|
||||||
|
}
|
||||||
if _, err := w.Write([]byte{131}); err != nil {
|
if _, err := w.Write([]byte{131}); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// t.t.Start ([]cid.Cid)
|
// t.t.Start ([]cid.Cid) (slice)
|
||||||
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.Start)))); err != nil {
|
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.Start)))); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -29,19 +33,20 @@ func (t *BlockSyncRequest) MarshalCBOR(w io.Writer) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// t.t.RequestLength (uint64)
|
// t.t.RequestLength (uint64) (uint64)
|
||||||
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, t.RequestLength)); err != nil {
|
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.RequestLength))); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// t.t.Options (uint64)
|
// t.t.Options (uint64) (uint64)
|
||||||
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, t.Options)); err != nil {
|
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.Options))); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *BlockSyncRequest) UnmarshalCBOR(br io.Reader) error {
|
func (t *BlockSyncRequest) UnmarshalCBOR(r io.Reader) error {
|
||||||
|
br := cbg.GetPeeker(r)
|
||||||
|
|
||||||
maj, extra, err := cbg.CborReadHeader(br)
|
maj, extra, err := cbg.CborReadHeader(br)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -55,14 +60,14 @@ func (t *BlockSyncRequest) UnmarshalCBOR(br io.Reader) error {
|
|||||||
return fmt.Errorf("cbor input had wrong number of fields")
|
return fmt.Errorf("cbor input had wrong number of fields")
|
||||||
}
|
}
|
||||||
|
|
||||||
// t.t.Start ([]cid.Cid)
|
// t.t.Start ([]cid.Cid) (slice)
|
||||||
|
|
||||||
maj, extra, err = cbg.CborReadHeader(br)
|
maj, extra, err = cbg.CborReadHeader(br)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if extra > 8192 {
|
if extra > 8192 {
|
||||||
return fmt.Errorf("array too large")
|
return fmt.Errorf("t.Start: array too large (%d)", extra)
|
||||||
}
|
}
|
||||||
|
|
||||||
if maj != cbg.MajArray {
|
if maj != cbg.MajArray {
|
||||||
@ -80,7 +85,7 @@ func (t *BlockSyncRequest) UnmarshalCBOR(br io.Reader) error {
|
|||||||
t.Start[i] = c
|
t.Start[i] = c
|
||||||
}
|
}
|
||||||
|
|
||||||
// t.t.RequestLength (uint64)
|
// t.t.RequestLength (uint64) (uint64)
|
||||||
|
|
||||||
maj, extra, err = cbg.CborReadHeader(br)
|
maj, extra, err = cbg.CborReadHeader(br)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -89,8 +94,8 @@ func (t *BlockSyncRequest) UnmarshalCBOR(br io.Reader) error {
|
|||||||
if maj != cbg.MajUnsignedInt {
|
if maj != cbg.MajUnsignedInt {
|
||||||
return fmt.Errorf("wrong type for uint64 field")
|
return fmt.Errorf("wrong type for uint64 field")
|
||||||
}
|
}
|
||||||
t.RequestLength = extra
|
t.RequestLength = uint64(extra)
|
||||||
// t.t.Options (uint64)
|
// t.t.Options (uint64) (uint64)
|
||||||
|
|
||||||
maj, extra, err = cbg.CborReadHeader(br)
|
maj, extra, err = cbg.CborReadHeader(br)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -99,16 +104,20 @@ func (t *BlockSyncRequest) UnmarshalCBOR(br io.Reader) error {
|
|||||||
if maj != cbg.MajUnsignedInt {
|
if maj != cbg.MajUnsignedInt {
|
||||||
return fmt.Errorf("wrong type for uint64 field")
|
return fmt.Errorf("wrong type for uint64 field")
|
||||||
}
|
}
|
||||||
t.Options = extra
|
t.Options = uint64(extra)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *BlockSyncResponse) MarshalCBOR(w io.Writer) error {
|
func (t *BlockSyncResponse) MarshalCBOR(w io.Writer) error {
|
||||||
|
if t == nil {
|
||||||
|
_, err := w.Write(cbg.CborNull)
|
||||||
|
return err
|
||||||
|
}
|
||||||
if _, err := w.Write([]byte{131}); err != nil {
|
if _, err := w.Write([]byte{131}); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// t.t.Chain ([]*chain.BSTipSet)
|
// t.t.Chain ([]*blocksync.BSTipSet) (slice)
|
||||||
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.Chain)))); err != nil {
|
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.Chain)))); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -118,12 +127,12 @@ func (t *BlockSyncResponse) MarshalCBOR(w io.Writer) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// t.t.Status (uint64)
|
// t.t.Status (uint64) (uint64)
|
||||||
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, t.Status)); err != nil {
|
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.Status))); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// t.t.Message (string)
|
// t.t.Message (string) (string)
|
||||||
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.Message)))); err != nil {
|
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.Message)))); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -133,7 +142,8 @@ func (t *BlockSyncResponse) MarshalCBOR(w io.Writer) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *BlockSyncResponse) UnmarshalCBOR(br io.Reader) error {
|
func (t *BlockSyncResponse) UnmarshalCBOR(r io.Reader) error {
|
||||||
|
br := cbg.GetPeeker(r)
|
||||||
|
|
||||||
maj, extra, err := cbg.CborReadHeader(br)
|
maj, extra, err := cbg.CborReadHeader(br)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -147,14 +157,14 @@ func (t *BlockSyncResponse) UnmarshalCBOR(br io.Reader) error {
|
|||||||
return fmt.Errorf("cbor input had wrong number of fields")
|
return fmt.Errorf("cbor input had wrong number of fields")
|
||||||
}
|
}
|
||||||
|
|
||||||
// t.t.Chain ([]*chain.BSTipSet)
|
// t.t.Chain ([]*blocksync.BSTipSet) (slice)
|
||||||
|
|
||||||
maj, extra, err = cbg.CborReadHeader(br)
|
maj, extra, err = cbg.CborReadHeader(br)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if extra > 8192 {
|
if extra > 8192 {
|
||||||
return fmt.Errorf("array too large")
|
return fmt.Errorf("t.Chain: array too large (%d)", extra)
|
||||||
}
|
}
|
||||||
|
|
||||||
if maj != cbg.MajArray {
|
if maj != cbg.MajArray {
|
||||||
@ -164,6 +174,7 @@ func (t *BlockSyncResponse) UnmarshalCBOR(br io.Reader) error {
|
|||||||
t.Chain = make([]*BSTipSet, extra)
|
t.Chain = make([]*BSTipSet, extra)
|
||||||
}
|
}
|
||||||
for i := 0; i < int(extra); i++ {
|
for i := 0; i < int(extra); i++ {
|
||||||
|
|
||||||
var v BSTipSet
|
var v BSTipSet
|
||||||
if err := v.UnmarshalCBOR(br); err != nil {
|
if err := v.UnmarshalCBOR(br); err != nil {
|
||||||
return err
|
return err
|
||||||
@ -172,7 +183,7 @@ func (t *BlockSyncResponse) UnmarshalCBOR(br io.Reader) error {
|
|||||||
t.Chain[i] = &v
|
t.Chain[i] = &v
|
||||||
}
|
}
|
||||||
|
|
||||||
// t.t.Status (uint64)
|
// t.t.Status (uint64) (uint64)
|
||||||
|
|
||||||
maj, extra, err = cbg.CborReadHeader(br)
|
maj, extra, err = cbg.CborReadHeader(br)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -181,39 +192,30 @@ func (t *BlockSyncResponse) UnmarshalCBOR(br io.Reader) error {
|
|||||||
if maj != cbg.MajUnsignedInt {
|
if maj != cbg.MajUnsignedInt {
|
||||||
return fmt.Errorf("wrong type for uint64 field")
|
return fmt.Errorf("wrong type for uint64 field")
|
||||||
}
|
}
|
||||||
t.Status = extra
|
t.Status = uint64(extra)
|
||||||
// t.t.Message (string)
|
// t.t.Message (string) (string)
|
||||||
|
|
||||||
maj, extra, err = cbg.CborReadHeader(br)
|
{
|
||||||
|
sval, err := cbg.ReadString(br)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if maj != cbg.MajTextString {
|
t.Message = string(sval)
|
||||||
return fmt.Errorf("expected cbor type 'text string' in input")
|
|
||||||
}
|
|
||||||
|
|
||||||
if extra > 256*1024 {
|
|
||||||
return fmt.Errorf("string in cbor input too long")
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
|
||||||
buf := make([]byte, extra)
|
|
||||||
if _, err := io.ReadFull(br, buf); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Message = string(buf)
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *BSTipSet) MarshalCBOR(w io.Writer) error {
|
func (t *BSTipSet) MarshalCBOR(w io.Writer) error {
|
||||||
|
if t == nil {
|
||||||
|
_, err := w.Write(cbg.CborNull)
|
||||||
|
return err
|
||||||
|
}
|
||||||
if _, err := w.Write([]byte{133}); err != nil {
|
if _, err := w.Write([]byte{133}); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// t.t.Blocks ([]*types.BlockHeader)
|
// t.t.Blocks ([]*types.BlockHeader) (slice)
|
||||||
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.Blocks)))); err != nil {
|
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.Blocks)))); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -223,7 +225,7 @@ func (t *BSTipSet) MarshalCBOR(w io.Writer) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// t.t.BlsMessages ([]*types.Message)
|
// t.t.BlsMessages ([]*types.Message) (slice)
|
||||||
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.BlsMessages)))); err != nil {
|
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.BlsMessages)))); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -233,7 +235,7 @@ func (t *BSTipSet) MarshalCBOR(w io.Writer) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// t.t.BlsMsgIncludes ([][]uint64)
|
// t.t.BlsMsgIncludes ([][]uint64) (slice)
|
||||||
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.BlsMsgIncludes)))); err != nil {
|
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.BlsMsgIncludes)))); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -248,7 +250,7 @@ func (t *BSTipSet) MarshalCBOR(w io.Writer) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// t.t.SecpkMessages ([]*types.SignedMessage)
|
// t.t.SecpkMessages ([]*types.SignedMessage) (slice)
|
||||||
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.SecpkMessages)))); err != nil {
|
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.SecpkMessages)))); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -258,7 +260,7 @@ func (t *BSTipSet) MarshalCBOR(w io.Writer) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// t.t.SecpkMsgIncludes ([][]uint64)
|
// t.t.SecpkMsgIncludes ([][]uint64) (slice)
|
||||||
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.SecpkMsgIncludes)))); err != nil {
|
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.SecpkMsgIncludes)))); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -275,7 +277,8 @@ func (t *BSTipSet) MarshalCBOR(w io.Writer) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *BSTipSet) UnmarshalCBOR(br io.Reader) error {
|
func (t *BSTipSet) UnmarshalCBOR(r io.Reader) error {
|
||||||
|
br := cbg.GetPeeker(r)
|
||||||
|
|
||||||
maj, extra, err := cbg.CborReadHeader(br)
|
maj, extra, err := cbg.CborReadHeader(br)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -289,14 +292,14 @@ func (t *BSTipSet) UnmarshalCBOR(br io.Reader) error {
|
|||||||
return fmt.Errorf("cbor input had wrong number of fields")
|
return fmt.Errorf("cbor input had wrong number of fields")
|
||||||
}
|
}
|
||||||
|
|
||||||
// t.t.Blocks ([]*types.BlockHeader)
|
// t.t.Blocks ([]*types.BlockHeader) (slice)
|
||||||
|
|
||||||
maj, extra, err = cbg.CborReadHeader(br)
|
maj, extra, err = cbg.CborReadHeader(br)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if extra > 8192 {
|
if extra > 8192 {
|
||||||
return fmt.Errorf("array too large")
|
return fmt.Errorf("t.Blocks: array too large (%d)", extra)
|
||||||
}
|
}
|
||||||
|
|
||||||
if maj != cbg.MajArray {
|
if maj != cbg.MajArray {
|
||||||
@ -306,6 +309,7 @@ func (t *BSTipSet) UnmarshalCBOR(br io.Reader) error {
|
|||||||
t.Blocks = make([]*types.BlockHeader, extra)
|
t.Blocks = make([]*types.BlockHeader, extra)
|
||||||
}
|
}
|
||||||
for i := 0; i < int(extra); i++ {
|
for i := 0; i < int(extra); i++ {
|
||||||
|
|
||||||
var v types.BlockHeader
|
var v types.BlockHeader
|
||||||
if err := v.UnmarshalCBOR(br); err != nil {
|
if err := v.UnmarshalCBOR(br); err != nil {
|
||||||
return err
|
return err
|
||||||
@ -314,14 +318,14 @@ func (t *BSTipSet) UnmarshalCBOR(br io.Reader) error {
|
|||||||
t.Blocks[i] = &v
|
t.Blocks[i] = &v
|
||||||
}
|
}
|
||||||
|
|
||||||
// t.t.BlsMessages ([]*types.Message)
|
// t.t.BlsMessages ([]*types.Message) (slice)
|
||||||
|
|
||||||
maj, extra, err = cbg.CborReadHeader(br)
|
maj, extra, err = cbg.CborReadHeader(br)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if extra > 8192 {
|
if extra > 8192 {
|
||||||
return fmt.Errorf("array too large")
|
return fmt.Errorf("t.BlsMessages: array too large (%d)", extra)
|
||||||
}
|
}
|
||||||
|
|
||||||
if maj != cbg.MajArray {
|
if maj != cbg.MajArray {
|
||||||
@ -331,6 +335,7 @@ func (t *BSTipSet) UnmarshalCBOR(br io.Reader) error {
|
|||||||
t.BlsMessages = make([]*types.Message, extra)
|
t.BlsMessages = make([]*types.Message, extra)
|
||||||
}
|
}
|
||||||
for i := 0; i < int(extra); i++ {
|
for i := 0; i < int(extra); i++ {
|
||||||
|
|
||||||
var v types.Message
|
var v types.Message
|
||||||
if err := v.UnmarshalCBOR(br); err != nil {
|
if err := v.UnmarshalCBOR(br); err != nil {
|
||||||
return err
|
return err
|
||||||
@ -339,14 +344,14 @@ func (t *BSTipSet) UnmarshalCBOR(br io.Reader) error {
|
|||||||
t.BlsMessages[i] = &v
|
t.BlsMessages[i] = &v
|
||||||
}
|
}
|
||||||
|
|
||||||
// t.t.BlsMsgIncludes ([][]uint64)
|
// t.t.BlsMsgIncludes ([][]uint64) (slice)
|
||||||
|
|
||||||
maj, extra, err = cbg.CborReadHeader(br)
|
maj, extra, err = cbg.CborReadHeader(br)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if extra > 8192 {
|
if extra > 8192 {
|
||||||
return fmt.Errorf("array too large")
|
return fmt.Errorf("t.BlsMsgIncludes: array too large (%d)", extra)
|
||||||
}
|
}
|
||||||
|
|
||||||
if maj != cbg.MajArray {
|
if maj != cbg.MajArray {
|
||||||
@ -366,7 +371,7 @@ func (t *BSTipSet) UnmarshalCBOR(br io.Reader) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if extra > 8192 {
|
if extra > 8192 {
|
||||||
return fmt.Errorf("array too large")
|
return fmt.Errorf("t.BlsMsgIncludes[i]: array too large (%d)", extra)
|
||||||
}
|
}
|
||||||
|
|
||||||
if maj != cbg.MajArray {
|
if maj != cbg.MajArray {
|
||||||
@ -392,14 +397,14 @@ func (t *BSTipSet) UnmarshalCBOR(br io.Reader) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// t.t.SecpkMessages ([]*types.SignedMessage)
|
// t.t.SecpkMessages ([]*types.SignedMessage) (slice)
|
||||||
|
|
||||||
maj, extra, err = cbg.CborReadHeader(br)
|
maj, extra, err = cbg.CborReadHeader(br)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if extra > 8192 {
|
if extra > 8192 {
|
||||||
return fmt.Errorf("array too large")
|
return fmt.Errorf("t.SecpkMessages: array too large (%d)", extra)
|
||||||
}
|
}
|
||||||
|
|
||||||
if maj != cbg.MajArray {
|
if maj != cbg.MajArray {
|
||||||
@ -409,6 +414,7 @@ func (t *BSTipSet) UnmarshalCBOR(br io.Reader) error {
|
|||||||
t.SecpkMessages = make([]*types.SignedMessage, extra)
|
t.SecpkMessages = make([]*types.SignedMessage, extra)
|
||||||
}
|
}
|
||||||
for i := 0; i < int(extra); i++ {
|
for i := 0; i < int(extra); i++ {
|
||||||
|
|
||||||
var v types.SignedMessage
|
var v types.SignedMessage
|
||||||
if err := v.UnmarshalCBOR(br); err != nil {
|
if err := v.UnmarshalCBOR(br); err != nil {
|
||||||
return err
|
return err
|
||||||
@ -417,14 +423,14 @@ func (t *BSTipSet) UnmarshalCBOR(br io.Reader) error {
|
|||||||
t.SecpkMessages[i] = &v
|
t.SecpkMessages[i] = &v
|
||||||
}
|
}
|
||||||
|
|
||||||
// t.t.SecpkMsgIncludes ([][]uint64)
|
// t.t.SecpkMsgIncludes ([][]uint64) (slice)
|
||||||
|
|
||||||
maj, extra, err = cbg.CborReadHeader(br)
|
maj, extra, err = cbg.CborReadHeader(br)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if extra > 8192 {
|
if extra > 8192 {
|
||||||
return fmt.Errorf("array too large")
|
return fmt.Errorf("t.SecpkMsgIncludes: array too large (%d)", extra)
|
||||||
}
|
}
|
||||||
|
|
||||||
if maj != cbg.MajArray {
|
if maj != cbg.MajArray {
|
||||||
@ -444,7 +450,7 @@ func (t *BSTipSet) UnmarshalCBOR(br io.Reader) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if extra > 8192 {
|
if extra > 8192 {
|
||||||
return fmt.Errorf("array too large")
|
return fmt.Errorf("t.SecpkMsgIncludes[i]: array too large (%d)", extra)
|
||||||
}
|
}
|
||||||
|
|
||||||
if maj != cbg.MajArray {
|
if maj != cbg.MajArray {
|
@ -231,7 +231,7 @@ func (cs *ChainStore) MaybeTakeHeavierTipSet(ctx context.Context, ts *types.TipS
|
|||||||
// TODO: don't do this for initial sync. Now that we don't have a
|
// TODO: don't do this for initial sync. Now that we don't have a
|
||||||
// difference between 'bootstrap sync' and 'caught up' sync, we need
|
// difference between 'bootstrap sync' and 'caught up' sync, we need
|
||||||
// some other heuristic.
|
// some other heuristic.
|
||||||
return cs.takeHeaviestTipSet(ts)
|
return cs.takeHeaviestTipSet(ctx, ts)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -267,7 +267,10 @@ func (cs *ChainStore) reorgWorker(ctx context.Context) chan<- reorg {
|
|||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *ChainStore) takeHeaviestTipSet(ts *types.TipSet) error {
|
func (cs *ChainStore) takeHeaviestTipSet(ctx context.Context, ts *types.TipSet) error {
|
||||||
|
ctx, span := trace.StartSpan(ctx, "takeHeaviestTipSet")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
if cs.heaviest != nil { // buf
|
if cs.heaviest != nil { // buf
|
||||||
if len(cs.reorgCh) > 0 {
|
if len(cs.reorgCh) > 0 {
|
||||||
log.Warnf("Reorg channel running behind, %d reorgs buffered", len(cs.reorgCh))
|
log.Warnf("Reorg channel running behind, %d reorgs buffered", len(cs.reorgCh))
|
||||||
@ -280,6 +283,8 @@ func (cs *ChainStore) takeHeaviestTipSet(ts *types.TipSet) error {
|
|||||||
log.Warnf("no heaviest tipset found, using %s", ts.Cids())
|
log.Warnf("no heaviest tipset found, using %s", ts.Cids())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
span.AddAttributes(trace.BoolAttribute("newHead", true))
|
||||||
|
|
||||||
log.Debugf("New heaviest tipset! %s", ts.Cids())
|
log.Debugf("New heaviest tipset! %s", ts.Cids())
|
||||||
cs.heaviest = ts
|
cs.heaviest = ts
|
||||||
|
|
||||||
@ -296,7 +301,7 @@ func (cs *ChainStore) takeHeaviestTipSet(ts *types.TipSet) error {
|
|||||||
func (cs *ChainStore) SetHead(ts *types.TipSet) error {
|
func (cs *ChainStore) SetHead(ts *types.TipSet) error {
|
||||||
cs.heaviestLk.Lock()
|
cs.heaviestLk.Lock()
|
||||||
defer cs.heaviestLk.Unlock()
|
defer cs.heaviestLk.Unlock()
|
||||||
return cs.takeHeaviestTipSet(ts)
|
return cs.takeHeaviestTipSet(context.TODO(), ts)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *ChainStore) Contains(ts *types.TipSet) (bool, error) {
|
func (cs *ChainStore) Contains(ts *types.TipSet) (bool, error) {
|
||||||
|
@ -12,6 +12,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
"github.com/filecoin-project/lotus/chain/actors"
|
"github.com/filecoin-project/lotus/chain/actors"
|
||||||
"github.com/filecoin-project/lotus/chain/address"
|
"github.com/filecoin-project/lotus/chain/address"
|
||||||
|
"github.com/filecoin-project/lotus/chain/blocksync"
|
||||||
"github.com/filecoin-project/lotus/chain/state"
|
"github.com/filecoin-project/lotus/chain/state"
|
||||||
"github.com/filecoin-project/lotus/chain/stmgr"
|
"github.com/filecoin-project/lotus/chain/stmgr"
|
||||||
"github.com/filecoin-project/lotus/chain/store"
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
@ -49,7 +50,7 @@ type Syncer struct {
|
|||||||
bad *BadBlockCache
|
bad *BadBlockCache
|
||||||
|
|
||||||
// handle to the block sync service
|
// handle to the block sync service
|
||||||
Bsync *BlockSync
|
Bsync *blocksync.BlockSync
|
||||||
|
|
||||||
self peer.ID
|
self peer.ID
|
||||||
|
|
||||||
@ -61,7 +62,7 @@ type Syncer struct {
|
|||||||
peerHeadsLk sync.Mutex
|
peerHeadsLk sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSyncer(sm *stmgr.StateManager, bsync *BlockSync, self peer.ID) (*Syncer, error) {
|
func NewSyncer(sm *stmgr.StateManager, bsync *blocksync.BlockSync, self peer.ID) (*Syncer, error) {
|
||||||
gen, err := sm.ChainStore().GetGenesis()
|
gen, err := sm.ChainStore().GetGenesis()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -111,14 +112,22 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) {
|
|||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
syncer.peerHeadsLk.Lock()
|
syncer.peerHeadsLk.Lock()
|
||||||
syncer.peerHeads[from] = fts.TipSet()
|
syncer.peerHeads[from] = fts.TipSet()
|
||||||
syncer.peerHeadsLk.Unlock()
|
syncer.peerHeadsLk.Unlock()
|
||||||
syncer.Bsync.AddPeer(from)
|
syncer.Bsync.AddPeer(from)
|
||||||
|
|
||||||
|
bestPweight := syncer.store.GetHeaviestTipSet().Blocks()[0].ParentWeight
|
||||||
|
targetWeight := fts.TipSet().Blocks()[0].ParentWeight
|
||||||
|
if targetWeight.LessThan(bestPweight) {
|
||||||
|
log.Warn("incoming tipset does not appear to be better than our best chain, ignoring for now")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
if err := syncer.Sync(ctx, fts.TipSet()); err != nil {
|
if err := syncer.Sync(ctx, fts.TipSet()); err != nil {
|
||||||
log.Errorf("sync error: %+v", err)
|
log.Errorf("sync error (curW=%s, targetW=%s): %+v", bestPweight, targetWeight, err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
@ -350,6 +359,12 @@ func (syncer *Syncer) tryLoadFullTipSet(cids []cid.Cid) (*store.FullTipSet, erro
|
|||||||
func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error {
|
func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error {
|
||||||
ctx, span := trace.StartSpan(ctx, "chain.Sync")
|
ctx, span := trace.StartSpan(ctx, "chain.Sync")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
if span.IsRecordingEvents() {
|
||||||
|
span.AddAttributes(
|
||||||
|
trace.StringAttribute("tipset", fmt.Sprint(maybeHead.Cids())),
|
||||||
|
trace.Int64Attribute("height", int64(maybeHead.Height())),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
syncer.syncLock.Lock()
|
syncer.syncLock.Lock()
|
||||||
defer syncer.syncLock.Unlock()
|
defer syncer.syncLock.Unlock()
|
||||||
@ -359,10 +374,12 @@ func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err := syncer.collectChain(ctx, maybeHead); err != nil {
|
if err := syncer.collectChain(ctx, maybeHead); err != nil {
|
||||||
|
span.AddAttributes(trace.StringAttribute("col_error", err.Error()))
|
||||||
return xerrors.Errorf("collectChain failed: %w", err)
|
return xerrors.Errorf("collectChain failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := syncer.store.PutTipSet(ctx, maybeHead); err != nil {
|
if err := syncer.store.PutTipSet(ctx, maybeHead); err != nil {
|
||||||
|
span.AddAttributes(trace.StringAttribute("put_error", err.Error()))
|
||||||
return xerrors.Errorf("failed to put synced tipset to chainstore: %w", err)
|
return xerrors.Errorf("failed to put synced tipset to chainstore: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -682,6 +699,15 @@ func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to
|
|||||||
trace.Int64Attribute("toHeight", int64(to.Height())),
|
trace.Int64Attribute("toHeight", int64(to.Height())),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
for _, pcid := range from.Parents() {
|
||||||
|
if syncer.bad.Has(pcid) {
|
||||||
|
for _, b := range from.Cids() {
|
||||||
|
syncer.bad.Add(b)
|
||||||
|
}
|
||||||
|
return nil, xerrors.Errorf("chain linked to block marked previously as bad (%s, %s)", from.Cids(), pcid)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
blockSet := []*types.TipSet{from}
|
blockSet := []*types.TipSet{from}
|
||||||
|
|
||||||
at := from.Parents()
|
at := from.Parents()
|
||||||
@ -724,6 +750,8 @@ loop:
|
|||||||
|
|
||||||
log.Errorf("failed to get blocks: %+v", err)
|
log.Errorf("failed to get blocks: %+v", err)
|
||||||
|
|
||||||
|
span.AddAttributes(trace.StringAttribute("error", err.Error()))
|
||||||
|
|
||||||
// This error will only be logged above,
|
// This error will only be logged above,
|
||||||
return nil, xerrors.Errorf("failed to get blocks: %w", err)
|
return nil, xerrors.Errorf("failed to get blocks: %w", err)
|
||||||
}
|
}
|
||||||
@ -756,6 +784,13 @@ loop:
|
|||||||
log.Warnf("(fork detected) synced header chain (%s - %d) does not link to our best block (%s - %d)", from.Cids(), from.Height(), to.Cids(), to.Height())
|
log.Warnf("(fork detected) synced header chain (%s - %d) does not link to our best block (%s - %d)", from.Cids(), from.Height(), to.Cids(), to.Height())
|
||||||
fork, err := syncer.syncFork(ctx, last, to)
|
fork, err := syncer.syncFork(ctx, last, to)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if xerrors.Is(err, ErrForkTooLong) {
|
||||||
|
// TODO: we're marking this block bad in the same way that we mark invalid blocks bad. Maybe distinguish?
|
||||||
|
log.Warn("adding forked chain to our bad tipset cache")
|
||||||
|
for _, b := range from.Blocks() {
|
||||||
|
syncer.bad.Add(b.Cid())
|
||||||
|
}
|
||||||
|
}
|
||||||
return nil, xerrors.Errorf("failed to sync fork: %w", err)
|
return nil, xerrors.Errorf("failed to sync fork: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -765,6 +800,8 @@ loop:
|
|||||||
return blockSet, nil
|
return blockSet, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var ErrForkTooLong = fmt.Errorf("fork longer than threshold")
|
||||||
|
|
||||||
func (syncer *Syncer) syncFork(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) {
|
func (syncer *Syncer) syncFork(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) {
|
||||||
tips, err := syncer.Bsync.GetBlocks(ctx, from.Parents(), build.ForkLengthThreshold)
|
tips, err := syncer.Bsync.GetBlocks(ctx, from.Parents(), build.ForkLengthThreshold)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -791,7 +828,7 @@ func (syncer *Syncer) syncFork(ctx context.Context, from *types.TipSet, to *type
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil, xerrors.Errorf("fork was longer than our threshold")
|
return nil, ErrForkTooLong
|
||||||
}
|
}
|
||||||
|
|
||||||
func (syncer *Syncer) syncMessagesAndCheckState(ctx context.Context, headers []*types.TipSet) error {
|
func (syncer *Syncer) syncMessagesAndCheckState(ctx context.Context, headers []*types.TipSet) error {
|
||||||
@ -873,7 +910,7 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func persistMessages(bs bstore.Blockstore, bst *BSTipSet) error {
|
func persistMessages(bs bstore.Blockstore, bst *blocksync.BSTipSet) error {
|
||||||
for _, m := range bst.BlsMessages {
|
for _, m := range bst.BlsMessages {
|
||||||
//log.Infof("putting BLS message: %s", m.Cid())
|
//log.Infof("putting BLS message: %s", m.Cid())
|
||||||
if _, err := store.PutMessage(bs, m); err != nil {
|
if _, err := store.PutMessage(bs, m); err != nil {
|
||||||
@ -906,6 +943,8 @@ func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
span.AddAttributes(trace.Int64Attribute("syncChainLength", int64(len(headers))))
|
||||||
|
|
||||||
if !headers[0].Equals(ts) {
|
if !headers[0].Equals(ts) {
|
||||||
log.Errorf("collectChain headers[0] should be equal to sync target. Its not: %s != %s", headers[0].Cids(), ts.Cids())
|
log.Errorf("collectChain headers[0] should be equal to sync target. Its not: %s != %s", headers[0].Cids(), ts.Cids())
|
||||||
}
|
}
|
||||||
|
@ -5,7 +5,7 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"math"
|
"math"
|
||||||
|
|
||||||
cid "github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
cbg "github.com/whyrusleeping/cbor-gen"
|
cbg "github.com/whyrusleeping/cbor-gen"
|
||||||
xerrors "golang.org/x/xerrors"
|
xerrors "golang.org/x/xerrors"
|
||||||
)
|
)
|
||||||
|
11
gen/main.go
11
gen/main.go
@ -8,6 +8,7 @@ import (
|
|||||||
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/chain/actors"
|
"github.com/filecoin-project/lotus/chain/actors"
|
||||||
|
"github.com/filecoin-project/lotus/chain/blocksync"
|
||||||
"github.com/filecoin-project/lotus/chain/deals"
|
"github.com/filecoin-project/lotus/chain/deals"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
"github.com/filecoin-project/lotus/paych"
|
"github.com/filecoin-project/lotus/paych"
|
||||||
@ -71,17 +72,15 @@ func main() {
|
|||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
err = gen.WriteTupleEncodersToFile("./chain/blocksync/cbor_gen.go", "blocksync",
|
||||||
err = gen.WriteTupleEncodersToFile("./chain/cbor_gen.go", "chain",
|
blocksync.BlockSyncRequest{},
|
||||||
chain.BlockSyncRequest{},
|
blocksync.BlockSyncResponse{},
|
||||||
chain.BlockSyncResponse{},
|
blocksync.BSTipSet{},
|
||||||
chain.BSTipSet{},
|
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
fmt.Println(err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
|
|
||||||
err = gen.WriteTupleEncodersToFile("./chain/actors/cbor_gen.go", "actors",
|
err = gen.WriteTupleEncodersToFile("./chain/actors/cbor_gen.go", "actors",
|
||||||
actors.InitActorState{},
|
actors.InitActorState{},
|
||||||
|
2
go.mod
2
go.mod
@ -45,11 +45,13 @@ require (
|
|||||||
github.com/libp2p/go-libp2p-connmgr v0.1.0
|
github.com/libp2p/go-libp2p-connmgr v0.1.0
|
||||||
github.com/libp2p/go-libp2p-core v0.2.2
|
github.com/libp2p/go-libp2p-core v0.2.2
|
||||||
github.com/libp2p/go-libp2p-discovery v0.1.0
|
github.com/libp2p/go-libp2p-discovery v0.1.0
|
||||||
|
github.com/libp2p/go-libp2p-host v0.1.0
|
||||||
github.com/libp2p/go-libp2p-kad-dht v0.1.1
|
github.com/libp2p/go-libp2p-kad-dht v0.1.1
|
||||||
github.com/libp2p/go-libp2p-mplex v0.2.1
|
github.com/libp2p/go-libp2p-mplex v0.2.1
|
||||||
github.com/libp2p/go-libp2p-peer v0.2.0
|
github.com/libp2p/go-libp2p-peer v0.2.0
|
||||||
github.com/libp2p/go-libp2p-peerstore v0.1.3
|
github.com/libp2p/go-libp2p-peerstore v0.1.3
|
||||||
github.com/libp2p/go-libp2p-pnet v0.1.0
|
github.com/libp2p/go-libp2p-pnet v0.1.0
|
||||||
|
github.com/libp2p/go-libp2p-protocol v0.1.0
|
||||||
github.com/libp2p/go-libp2p-pubsub v0.1.0
|
github.com/libp2p/go-libp2p-pubsub v0.1.0
|
||||||
github.com/libp2p/go-libp2p-quic-transport v0.1.1
|
github.com/libp2p/go-libp2p-quic-transport v0.1.1
|
||||||
github.com/libp2p/go-libp2p-record v0.1.1
|
github.com/libp2p/go-libp2p-record v0.1.1
|
||||||
|
9
go.sum
9
go.sum
@ -22,7 +22,9 @@ github.com/Stebalien/go-bitfield v0.0.1/go.mod h1:GNjFpasyUVkHMsfEOk8EFLJ9syQ6SI
|
|||||||
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
|
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
|
||||||
github.com/akavel/rsrc v0.8.0 h1:zjWn7ukO9Kc5Q62DOJCcxGpXC18RawVtYAGdz2aLlfw=
|
github.com/akavel/rsrc v0.8.0 h1:zjWn7ukO9Kc5Q62DOJCcxGpXC18RawVtYAGdz2aLlfw=
|
||||||
github.com/akavel/rsrc v0.8.0/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkKq+c=
|
github.com/akavel/rsrc v0.8.0/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkKq+c=
|
||||||
|
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc h1:cAKDfWh5VpdgMhJosfJnn5/FoN2SRZ4p7fJNX58YPaU=
|
||||||
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
|
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
|
||||||
|
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf h1:qet1QNfXsQxTZqLG4oE62mJzwPIB8+Tee4RNCL9ulrY=
|
||||||
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
|
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
|
||||||
github.com/apache/thrift v0.12.0 h1:pODnxUFNcjP9UTLZGTdeh+j16A8lJbRvD3rOtrk/7bs=
|
github.com/apache/thrift v0.12.0 h1:pODnxUFNcjP9UTLZGTdeh+j16A8lJbRvD3rOtrk/7bs=
|
||||||
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
|
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
|
||||||
@ -303,6 +305,8 @@ github.com/libp2p/go-libp2p-crypto v0.1.0 h1:k9MFy+o2zGDNGsaoZl0MA3iZ75qXxr9OOoA
|
|||||||
github.com/libp2p/go-libp2p-crypto v0.1.0/go.mod h1:sPUokVISZiy+nNuTTH/TY+leRSxnFj/2GLjtOTW90hI=
|
github.com/libp2p/go-libp2p-crypto v0.1.0/go.mod h1:sPUokVISZiy+nNuTTH/TY+leRSxnFj/2GLjtOTW90hI=
|
||||||
github.com/libp2p/go-libp2p-discovery v0.1.0 h1:j+R6cokKcGbnZLf4kcNwpx6mDEUPF3N6SrqMymQhmvs=
|
github.com/libp2p/go-libp2p-discovery v0.1.0 h1:j+R6cokKcGbnZLf4kcNwpx6mDEUPF3N6SrqMymQhmvs=
|
||||||
github.com/libp2p/go-libp2p-discovery v0.1.0/go.mod h1:4F/x+aldVHjHDHuX85x1zWoFTGElt8HnoDzwkFZm29g=
|
github.com/libp2p/go-libp2p-discovery v0.1.0/go.mod h1:4F/x+aldVHjHDHuX85x1zWoFTGElt8HnoDzwkFZm29g=
|
||||||
|
github.com/libp2p/go-libp2p-host v0.1.0 h1:OZwENiFm6JOK3YR5PZJxkXlJE8a5u8g4YvAUrEV2MjM=
|
||||||
|
github.com/libp2p/go-libp2p-host v0.1.0/go.mod h1:5+fWuLbDn8OxoxPN3CV0vsLe1hAKScSMbT84qRfxum8=
|
||||||
github.com/libp2p/go-libp2p-kad-dht v0.1.1 h1:IH6NQuoUv5w5e1O8Jc3KyVDtr0rNd0G9aaADpLI1xVo=
|
github.com/libp2p/go-libp2p-kad-dht v0.1.1 h1:IH6NQuoUv5w5e1O8Jc3KyVDtr0rNd0G9aaADpLI1xVo=
|
||||||
github.com/libp2p/go-libp2p-kad-dht v0.1.1/go.mod h1:1kj2Rk5pX3/0RwqMm9AMNCT7DzcMHYhgDN5VTi+cY0M=
|
github.com/libp2p/go-libp2p-kad-dht v0.1.1/go.mod h1:1kj2Rk5pX3/0RwqMm9AMNCT7DzcMHYhgDN5VTi+cY0M=
|
||||||
github.com/libp2p/go-libp2p-kbucket v0.2.0 h1:FB2a0VkOTNGTP5gu/I444u4WabNM9V1zCkQcWb7zajI=
|
github.com/libp2p/go-libp2p-kbucket v0.2.0 h1:FB2a0VkOTNGTP5gu/I444u4WabNM9V1zCkQcWb7zajI=
|
||||||
@ -323,6 +327,8 @@ github.com/libp2p/go-libp2p-peerstore v0.1.3 h1:wMgajt1uM2tMiqf4M+4qWKVyyFc8SfA+
|
|||||||
github.com/libp2p/go-libp2p-peerstore v0.1.3/go.mod h1:BJ9sHlm59/80oSkpWgr1MyY1ciXAXV397W6h1GH/uKI=
|
github.com/libp2p/go-libp2p-peerstore v0.1.3/go.mod h1:BJ9sHlm59/80oSkpWgr1MyY1ciXAXV397W6h1GH/uKI=
|
||||||
github.com/libp2p/go-libp2p-pnet v0.1.0 h1:kRUES28dktfnHNIRW4Ro78F7rKBHBiw5MJpl0ikrLIA=
|
github.com/libp2p/go-libp2p-pnet v0.1.0 h1:kRUES28dktfnHNIRW4Ro78F7rKBHBiw5MJpl0ikrLIA=
|
||||||
github.com/libp2p/go-libp2p-pnet v0.1.0/go.mod h1:ZkyZw3d0ZFOex71halXRihWf9WH/j3OevcJdTmD0lyE=
|
github.com/libp2p/go-libp2p-pnet v0.1.0/go.mod h1:ZkyZw3d0ZFOex71halXRihWf9WH/j3OevcJdTmD0lyE=
|
||||||
|
github.com/libp2p/go-libp2p-protocol v0.1.0 h1:HdqhEyhg0ToCaxgMhnOmUO8snQtt/kQlcjVk3UoJU3c=
|
||||||
|
github.com/libp2p/go-libp2p-protocol v0.1.0/go.mod h1:KQPHpAabB57XQxGrXCNvbL6UEXfQqUgC/1adR2Xtflk=
|
||||||
github.com/libp2p/go-libp2p-pubsub v0.1.0 h1:SmQeMa7IUv5vadh0fYgYsafWCBA1sCy5d/68kIYqGcU=
|
github.com/libp2p/go-libp2p-pubsub v0.1.0 h1:SmQeMa7IUv5vadh0fYgYsafWCBA1sCy5d/68kIYqGcU=
|
||||||
github.com/libp2p/go-libp2p-pubsub v0.1.0/go.mod h1:ZwlKzRSe1eGvSIdU5bD7+8RZN/Uzw0t1Bp9R1znpR/Q=
|
github.com/libp2p/go-libp2p-pubsub v0.1.0/go.mod h1:ZwlKzRSe1eGvSIdU5bD7+8RZN/Uzw0t1Bp9R1znpR/Q=
|
||||||
github.com/libp2p/go-libp2p-quic-transport v0.1.1 h1:MFMJzvsxIEDEVKzO89BnB/FgvMj9WI4GDGUW2ArDPUA=
|
github.com/libp2p/go-libp2p-quic-transport v0.1.1 h1:MFMJzvsxIEDEVKzO89BnB/FgvMj9WI4GDGUW2ArDPUA=
|
||||||
@ -474,6 +480,7 @@ github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP
|
|||||||
github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod h1:p2iRAGwDERtqlqzRXnrOVns+ignqQo//hLXqYxZYVNs=
|
github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod h1:p2iRAGwDERtqlqzRXnrOVns+ignqQo//hLXqYxZYVNs=
|
||||||
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
|
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
|
||||||
github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
|
github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
|
||||||
|
github.com/prometheus/common v0.2.0 h1:kUZDBDTdBVBYBj5Tmh2NZLlF60mfjA27rM34b+cVwNU=
|
||||||
github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
|
github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
|
||||||
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
|
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
|
||||||
github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
|
github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
|
||||||
@ -481,6 +488,7 @@ github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqn
|
|||||||
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
|
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
|
||||||
github.com/shirou/gopsutil v2.18.12+incompatible h1:1eaJvGomDnH74/5cF4CTmTbLHAriGFsTZppLXDX93OM=
|
github.com/shirou/gopsutil v2.18.12+incompatible h1:1eaJvGomDnH74/5cF4CTmTbLHAriGFsTZppLXDX93OM=
|
||||||
github.com/shirou/gopsutil v2.18.12+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
|
github.com/shirou/gopsutil v2.18.12+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
|
||||||
|
github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo=
|
||||||
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
|
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
|
||||||
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
|
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
|
||||||
github.com/smartystreets/assertions v1.0.0/go.mod h1:kHHU4qYBaI3q23Pp3VPrmWhuIUrLW/7eUrw0BU5VaoM=
|
github.com/smartystreets/assertions v1.0.0/go.mod h1:kHHU4qYBaI3q23Pp3VPrmWhuIUrLW/7eUrw0BU5VaoM=
|
||||||
@ -683,6 +691,7 @@ google.golang.org/genproto v0.0.0-20190502173448-54afdca5d873/go.mod h1:VzzqZJRn
|
|||||||
google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs=
|
google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs=
|
||||||
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
|
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
|
||||||
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
|
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
|
||||||
|
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
|
||||||
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
|
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
|
||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
|
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
|
||||||
|
@ -19,6 +19,7 @@ import (
|
|||||||
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/chain"
|
"github.com/filecoin-project/lotus/chain"
|
||||||
|
"github.com/filecoin-project/lotus/chain/blocksync"
|
||||||
"github.com/filecoin-project/lotus/chain/deals"
|
"github.com/filecoin-project/lotus/chain/deals"
|
||||||
"github.com/filecoin-project/lotus/chain/market"
|
"github.com/filecoin-project/lotus/chain/market"
|
||||||
"github.com/filecoin-project/lotus/chain/metrics"
|
"github.com/filecoin-project/lotus/chain/metrics"
|
||||||
@ -197,14 +198,14 @@ func Online() Option {
|
|||||||
|
|
||||||
// Filecoin services
|
// Filecoin services
|
||||||
Override(new(*chain.Syncer), chain.NewSyncer),
|
Override(new(*chain.Syncer), chain.NewSyncer),
|
||||||
Override(new(*chain.BlockSync), chain.NewBlockSyncClient),
|
Override(new(*blocksync.BlockSync), blocksync.NewBlockSyncClient),
|
||||||
Override(new(*chain.MessagePool), chain.NewMessagePool),
|
Override(new(*chain.MessagePool), chain.NewMessagePool),
|
||||||
|
|
||||||
Override(new(modules.Genesis), modules.ErrorGenesis),
|
Override(new(modules.Genesis), modules.ErrorGenesis),
|
||||||
Override(SetGenesisKey, modules.SetGenesis),
|
Override(SetGenesisKey, modules.SetGenesis),
|
||||||
|
|
||||||
Override(new(*hello.Service), hello.NewHelloService),
|
Override(new(*hello.Service), hello.NewHelloService),
|
||||||
Override(new(*chain.BlockSyncService), chain.NewBlockSyncService),
|
Override(new(*blocksync.BlockSyncService), blocksync.NewBlockSyncService),
|
||||||
Override(new(*peermgr.PeerMgr), peermgr.NewPeerMgr),
|
Override(new(*peermgr.PeerMgr), peermgr.NewPeerMgr),
|
||||||
|
|
||||||
Override(RunHelloKey, modules.RunHello),
|
Override(RunHelloKey, modules.RunHello),
|
||||||
|
@ -12,6 +12,7 @@ import (
|
|||||||
"github.com/libp2p/go-libp2p-core/host"
|
"github.com/libp2p/go-libp2p-core/host"
|
||||||
inet "github.com/libp2p/go-libp2p-core/network"
|
inet "github.com/libp2p/go-libp2p-core/network"
|
||||||
"github.com/libp2p/go-libp2p-core/peer"
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
|
protocol "github.com/libp2p/go-libp2p-protocol"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/chain"
|
"github.com/filecoin-project/lotus/chain"
|
||||||
"github.com/filecoin-project/lotus/chain/store"
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
@ -34,8 +35,9 @@ type Message struct {
|
|||||||
GenesisHash cid.Cid
|
GenesisHash cid.Cid
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type NewStreamFunc func(context.Context, peer.ID, ...protocol.ID) (inet.Stream, error)
|
||||||
type Service struct {
|
type Service struct {
|
||||||
newStream chain.NewStreamFunc
|
newStream NewStreamFunc
|
||||||
|
|
||||||
cs *store.ChainStore
|
cs *store.ChainStore
|
||||||
syncer *chain.Syncer
|
syncer *chain.Syncer
|
||||||
|
@ -9,6 +9,7 @@ import (
|
|||||||
"go.uber.org/fx"
|
"go.uber.org/fx"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/chain"
|
"github.com/filecoin-project/lotus/chain"
|
||||||
|
"github.com/filecoin-project/lotus/chain/blocksync"
|
||||||
"github.com/filecoin-project/lotus/chain/deals"
|
"github.com/filecoin-project/lotus/chain/deals"
|
||||||
"github.com/filecoin-project/lotus/chain/sub"
|
"github.com/filecoin-project/lotus/chain/sub"
|
||||||
"github.com/filecoin-project/lotus/node/hello"
|
"github.com/filecoin-project/lotus/node/hello"
|
||||||
@ -37,8 +38,8 @@ func RunPeerMgr(mctx helpers.MetricsCtx, lc fx.Lifecycle, pmgr *peermgr.PeerMgr)
|
|||||||
go pmgr.Run(helpers.LifecycleCtx(mctx, lc))
|
go pmgr.Run(helpers.LifecycleCtx(mctx, lc))
|
||||||
}
|
}
|
||||||
|
|
||||||
func RunBlockSync(h host.Host, svc *chain.BlockSyncService) {
|
func RunBlockSync(h host.Host, svc *blocksync.BlockSyncService) {
|
||||||
h.SetStreamHandler(chain.BlockSyncProtocolID, svc.HandleStream)
|
h.SetStreamHandler(blocksync.BlockSyncProtocolID, svc.HandleStream)
|
||||||
}
|
}
|
||||||
|
|
||||||
func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, pubsub *pubsub.PubSub, s *chain.Syncer) {
|
func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, pubsub *pubsub.PubSub, s *chain.Syncer) {
|
||||||
|
@ -142,6 +142,7 @@ func (p *post) doPost(ctx context.Context) error {
|
|||||||
func (p *post) preparePost(ctx context.Context) error {
|
func (p *post) preparePost(ctx context.Context) error {
|
||||||
ctx, span := trace.StartSpan(ctx, "storage.preparePost")
|
ctx, span := trace.StartSpan(ctx, "storage.preparePost")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
log.Info("preparePost")
|
||||||
|
|
||||||
sset, err := p.m.api.StateMinerProvingSet(ctx, p.m.maddr, p.ts)
|
sset, err := p.m.api.StateMinerProvingSet(ctx, p.m.maddr, p.ts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -251,6 +252,7 @@ func (p *post) waitCommit(ctx context.Context) error {
|
|||||||
log.Warnf("SubmitPoSt EXIT: %d", rec.Receipt.ExitCode)
|
log.Warnf("SubmitPoSt EXIT: %d", rec.Receipt.ExitCode)
|
||||||
// TODO: Do something
|
// TODO: Do something
|
||||||
}
|
}
|
||||||
|
log.Infof("Post made it on chain! (height=%d)", rec.TipSet.Height())
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user