diff --git a/chain/blocksync/blocksync.go b/chain/blocksync/blocksync.go new file mode 100644 index 000000000..fd352a9ea --- /dev/null +++ b/chain/blocksync/blocksync.go @@ -0,0 +1,237 @@ +package blocksync + +import ( + "bufio" + "context" + + "github.com/libp2p/go-libp2p-core/protocol" + "go.opencensus.io/trace" + "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/cborutil" + + "github.com/ipfs/go-cid" + cbor "github.com/ipfs/go-ipld-cbor" + logging "github.com/ipfs/go-log" + inet "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" +) + +var log = logging.Logger("blocksync") + +type NewStreamFunc func(context.Context, peer.ID, ...protocol.ID) (inet.Stream, error) + +const BlockSyncProtocolID = "/fil/sync/blk/0.0.1" + +func init() { + cbor.RegisterCborType(BlockSyncRequest{}) + cbor.RegisterCborType(BlockSyncResponse{}) + cbor.RegisterCborType(BSTipSet{}) +} + +type BlockSyncService struct { + cs *store.ChainStore +} + +type BlockSyncRequest struct { + Start []cid.Cid + RequestLength uint64 + + Options uint64 +} + +type BSOptions struct { + IncludeBlocks bool + IncludeMessages bool +} + +func ParseBSOptions(optfield uint64) *BSOptions { + return &BSOptions{ + IncludeBlocks: optfield&(BSOptBlocks) != 0, + IncludeMessages: optfield&(BSOptMessages) != 0, + } +} + +const ( + BSOptBlocks = 1 << 0 + BSOptMessages = 1 << 1 +) + +type BlockSyncResponse struct { + Chain []*BSTipSet + + Status uint64 + Message string +} + +type BSTipSet struct { + Blocks []*types.BlockHeader + + BlsMessages []*types.Message + BlsMsgIncludes [][]uint64 + + SecpkMessages []*types.SignedMessage + SecpkMsgIncludes [][]uint64 +} + +func NewBlockSyncService(cs *store.ChainStore) *BlockSyncService { + return &BlockSyncService{ + cs: cs, + } +} + +func (bss *BlockSyncService) HandleStream(s inet.Stream) { + ctx, span := trace.StartSpan(context.Background(), "blocksync.HandleStream") + defer span.End() + + defer s.Close() + + var req BlockSyncRequest + if err := cborutil.ReadCborRPC(bufio.NewReader(s), &req); err != nil { + log.Errorf("failed to read block sync request: %s", err) + return + } + log.Infof("block sync request for: %s %d", req.Start, req.RequestLength) + + resp, err := bss.processRequest(ctx, &req) + if err != nil { + log.Error("failed to process block sync request: ", err) + return + } + + if err := cborutil.WriteCborRPC(s, resp); err != nil { + log.Error("failed to write back response for handle stream: ", err) + return + } +} + +func (bss *BlockSyncService) processRequest(ctx context.Context, req *BlockSyncRequest) (*BlockSyncResponse, error) { + ctx, span := trace.StartSpan(ctx, "blocksync.ProcessRequest") + defer span.End() + + opts := ParseBSOptions(req.Options) + if len(req.Start) == 0 { + return &BlockSyncResponse{ + Status: 204, + Message: "no cids given in blocksync request", + }, nil + } + + span.AddAttributes( + trace.BoolAttribute("blocks", opts.IncludeBlocks), + trace.BoolAttribute("messages", opts.IncludeMessages), + ) + + chain, err := bss.collectChainSegment(req.Start, req.RequestLength, opts) + if err != nil { + log.Warn("encountered error while responding to block sync request: ", err) + return &BlockSyncResponse{ + Status: 203, + }, nil + } + + return &BlockSyncResponse{ + Chain: chain, + Status: 0, + }, nil +} + +func (bss *BlockSyncService) collectChainSegment(start []cid.Cid, length uint64, opts *BSOptions) ([]*BSTipSet, error) { + var bstips []*BSTipSet + cur := start + for { + var bst BSTipSet + ts, err := bss.cs.LoadTipSet(cur) + if err != nil { + return nil, xerrors.Errorf("failed loading tipset %s: %w", cur, err) + } + + if opts.IncludeMessages { + bmsgs, bmincl, smsgs, smincl, err := bss.gatherMessages(ts) + if err != nil { + return nil, xerrors.Errorf("gather messages failed: %w", err) + } + + bst.BlsMessages = bmsgs + bst.BlsMsgIncludes = bmincl + bst.SecpkMessages = smsgs + bst.SecpkMsgIncludes = smincl + } + + if opts.IncludeBlocks { + bst.Blocks = ts.Blocks() + } + + bstips = append(bstips, &bst) + + if uint64(len(bstips)) >= length || ts.Height() == 0 { + return bstips, nil + } + + cur = ts.Parents() + } +} + +func (bss *BlockSyncService) gatherMessages(ts *types.TipSet) ([]*types.Message, [][]uint64, []*types.SignedMessage, [][]uint64, error) { + blsmsgmap := make(map[cid.Cid]uint64) + secpkmsgmap := make(map[cid.Cid]uint64) + var secpkmsgs []*types.SignedMessage + var blsmsgs []*types.Message + var secpkincl, blsincl [][]uint64 + + for _, b := range ts.Blocks() { + bmsgs, smsgs, err := bss.cs.MessagesForBlock(b) + if err != nil { + return nil, nil, nil, nil, err + } + + bmi := make([]uint64, 0, len(bmsgs)) + for _, m := range bmsgs { + i, ok := blsmsgmap[m.Cid()] + if !ok { + i = uint64(len(blsmsgs)) + blsmsgs = append(blsmsgs, m) + blsmsgmap[m.Cid()] = i + } + + bmi = append(bmi, i) + } + blsincl = append(blsincl, bmi) + + smi := make([]uint64, 0, len(smsgs)) + for _, m := range smsgs { + i, ok := secpkmsgmap[m.Cid()] + if !ok { + i = uint64(len(secpkmsgs)) + secpkmsgs = append(secpkmsgs, m) + secpkmsgmap[m.Cid()] = i + } + + smi = append(smi, i) + } + secpkincl = append(secpkincl, smi) + } + + return blsmsgs, blsincl, secpkmsgs, secpkincl, nil +} + +func bstsToFullTipSet(bts *BSTipSet) (*store.FullTipSet, error) { + fts := &store.FullTipSet{} + for i, b := range bts.Blocks { + fb := &types.FullBlock{ + Header: b, + } + for _, mi := range bts.BlsMsgIncludes[i] { + fb.BlsMessages = append(fb.BlsMessages, bts.BlsMessages[mi]) + } + for _, mi := range bts.SecpkMsgIncludes[i] { + fb.SecpkMessages = append(fb.SecpkMessages, bts.SecpkMessages[mi]) + } + + fts.Blocks = append(fts.Blocks, fb) + } + + return fts, nil +} diff --git a/chain/blocksync.go b/chain/blocksync/blocksync_client.go similarity index 54% rename from chain/blocksync.go rename to chain/blocksync/blocksync_client.go index 71140f2e0..6af46a592 100644 --- a/chain/blocksync.go +++ b/chain/blocksync/blocksync_client.go @@ -1,15 +1,20 @@ -package chain +package blocksync import ( "bufio" "context" "fmt" "math/rand" + "sort" "sync" + "time" + blocks "github.com/ipfs/go-block-format" bserv "github.com/ipfs/go-blockservice" - "github.com/libp2p/go-libp2p-core/host" - "github.com/libp2p/go-libp2p-core/protocol" + "github.com/ipfs/go-cid" + inet "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + host "github.com/libp2p/go-libp2p-host" "go.opencensus.io/trace" "golang.org/x/xerrors" @@ -17,236 +22,24 @@ import ( "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/lib/cborutil" "github.com/filecoin-project/lotus/node/modules/dtypes" - - blocks "github.com/ipfs/go-block-format" - "github.com/ipfs/go-cid" - cbor "github.com/ipfs/go-ipld-cbor" - inet "github.com/libp2p/go-libp2p-core/network" - "github.com/libp2p/go-libp2p-core/peer" ) -type NewStreamFunc func(context.Context, peer.ID, ...protocol.ID) (inet.Stream, error) - -const BlockSyncProtocolID = "/fil/sync/blk/0.0.1" - -func init() { - cbor.RegisterCborType(BlockSyncRequest{}) - cbor.RegisterCborType(BlockSyncResponse{}) - cbor.RegisterCborType(BSTipSet{}) -} - -type BlockSyncService struct { - cs *store.ChainStore -} - -type BlockSyncRequest struct { - Start []cid.Cid - RequestLength uint64 - - Options uint64 -} - -type BSOptions struct { - IncludeBlocks bool - IncludeMessages bool -} - -func ParseBSOptions(optfield uint64) *BSOptions { - return &BSOptions{ - IncludeBlocks: optfield&(BSOptBlocks) != 0, - IncludeMessages: optfield&(BSOptMessages) != 0, - } -} - -const ( - BSOptBlocks = 1 << 0 - BSOptMessages = 1 << 1 -) - -type BlockSyncResponse struct { - Chain []*BSTipSet - - Status uint64 - Message string -} - -type BSTipSet struct { - Blocks []*types.BlockHeader - - BlsMessages []*types.Message - BlsMsgIncludes [][]uint64 - - SecpkMessages []*types.SignedMessage - SecpkMsgIncludes [][]uint64 -} - -func NewBlockSyncService(cs *store.ChainStore) *BlockSyncService { - return &BlockSyncService{ - cs: cs, - } -} - -func (bss *BlockSyncService) HandleStream(s inet.Stream) { - ctx, span := trace.StartSpan(context.Background(), "blocksync.HandleStream") - defer span.End() - - defer s.Close() - - var req BlockSyncRequest - if err := cborutil.ReadCborRPC(bufio.NewReader(s), &req); err != nil { - log.Errorf("failed to read block sync request: %s", err) - return - } - log.Infof("block sync request for: %s %d", req.Start, req.RequestLength) - - resp, err := bss.processRequest(ctx, &req) - if err != nil { - log.Error("failed to process block sync request: ", err) - return - } - - if err := cborutil.WriteCborRPC(s, resp); err != nil { - log.Error("failed to write back response for handle stream: ", err) - return - } -} - -func (bss *BlockSyncService) processRequest(ctx context.Context, req *BlockSyncRequest) (*BlockSyncResponse, error) { - ctx, span := trace.StartSpan(ctx, "blocksync.ProcessRequest") - defer span.End() - - opts := ParseBSOptions(req.Options) - if len(req.Start) == 0 { - return &BlockSyncResponse{ - Status: 204, - Message: "no cids given in blocksync request", - }, nil - } - - span.AddAttributes( - trace.BoolAttribute("blocks", opts.IncludeBlocks), - trace.BoolAttribute("messages", opts.IncludeMessages), - ) - - chain, err := bss.collectChainSegment(req.Start, req.RequestLength, opts) - if err != nil { - log.Error("encountered error while responding to block sync request: ", err) - return &BlockSyncResponse{ - Status: 203, - }, nil - } - - return &BlockSyncResponse{ - Chain: chain, - Status: 0, - }, nil -} - -func (bss *BlockSyncService) collectChainSegment(start []cid.Cid, length uint64, opts *BSOptions) ([]*BSTipSet, error) { - var bstips []*BSTipSet - cur := start - for { - var bst BSTipSet - ts, err := bss.cs.LoadTipSet(cur) - if err != nil { - return nil, err - } - - if opts.IncludeMessages { - bmsgs, bmincl, smsgs, smincl, err := bss.gatherMessages(ts) - if err != nil { - return nil, xerrors.Errorf("gather messages failed: %w", err) - } - - bst.BlsMessages = bmsgs - bst.BlsMsgIncludes = bmincl - bst.SecpkMessages = smsgs - bst.SecpkMsgIncludes = smincl - } - - if opts.IncludeBlocks { - bst.Blocks = ts.Blocks() - } - - bstips = append(bstips, &bst) - - if uint64(len(bstips)) >= length || ts.Height() == 0 { - return bstips, nil - } - - cur = ts.Parents() - } -} - -func (bss *BlockSyncService) gatherMessages(ts *types.TipSet) ([]*types.Message, [][]uint64, []*types.SignedMessage, [][]uint64, error) { - blsmsgmap := make(map[cid.Cid]uint64) - secpkmsgmap := make(map[cid.Cid]uint64) - var secpkmsgs []*types.SignedMessage - var blsmsgs []*types.Message - var secpkincl, blsincl [][]uint64 - - for _, b := range ts.Blocks() { - bmsgs, smsgs, err := bss.cs.MessagesForBlock(b) - if err != nil { - return nil, nil, nil, nil, err - } - - bmi := make([]uint64, 0, len(bmsgs)) - for _, m := range bmsgs { - i, ok := blsmsgmap[m.Cid()] - if !ok { - i = uint64(len(blsmsgs)) - blsmsgs = append(blsmsgs, m) - blsmsgmap[m.Cid()] = i - } - - bmi = append(bmi, i) - } - blsincl = append(blsincl, bmi) - - smi := make([]uint64, 0, len(smsgs)) - for _, m := range smsgs { - i, ok := secpkmsgmap[m.Cid()] - if !ok { - i = uint64(len(secpkmsgs)) - secpkmsgs = append(secpkmsgs, m) - secpkmsgmap[m.Cid()] = i - } - - smi = append(smi, i) - } - secpkincl = append(secpkincl, smi) - } - - return blsmsgs, blsincl, secpkmsgs, secpkincl, nil -} - type BlockSync struct { - bserv bserv.BlockService - newStream NewStreamFunc + bserv bserv.BlockService + host host.Host syncPeersLk sync.Mutex - syncPeers map[peer.ID]struct{} + syncPeers *bsPeerTracker } func NewBlockSyncClient(bserv dtypes.ChainBlockService, h host.Host) *BlockSync { return &BlockSync{ bserv: bserv, - newStream: h.NewStream, - syncPeers: make(map[peer.ID]struct{}), + host: h, + syncPeers: newPeerTracker(), } } -func (bs *BlockSync) getPeers() []peer.ID { - bs.syncPeersLk.Lock() - defer bs.syncPeersLk.Unlock() - var out []peer.ID - for p := range bs.syncPeers { - out = append(out, p) - } - return out -} - func (bs *BlockSync) processStatus(req *BlockSyncRequest, res *BlockSyncResponse) error { switch res.Status { case 101: // Partial Response @@ -274,22 +67,20 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tipset []cid.Cid, count int) ) } - peers := bs.getPeers() - perm := rand.Perm(len(peers)) - // TODO: round robin through these peers on error - req := &BlockSyncRequest{ Start: tipset, RequestLength: uint64(count), Options: BSOptBlocks, } + peers := bs.getPeers() + var oerr error - for _, p := range perm { - res, err := bs.sendRequestToPeer(ctx, peers[p], req) + for _, p := range peers { + res, err := bs.sendRequestToPeer(ctx, p, req) if err != nil { oerr = err - log.Warnf("BlockSync request failed for peer %s: %s", peers[p].String(), err) + log.Warnf("BlockSync request failed for peer %s: %s", p.String(), err) continue } @@ -298,7 +89,7 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tipset []cid.Cid, count int) } oerr = bs.processStatus(req, res) if oerr != nil { - log.Warnf("BlockSync peer %s response was an error: %s", peers[p].String(), oerr) + log.Warnf("BlockSync peer %s response was an error: %s", p.String(), oerr) } } return nil, xerrors.Errorf("GetBlocks failed with all peers: %w", oerr) @@ -327,11 +118,11 @@ func (bs *BlockSync) GetFullTipSet(ctx context.Context, p peer.ID, h []cid.Cid) return bstsToFullTipSet(bts) case 101: // Partial Response - panic("not handled") + return nil, xerrors.Errorf("partial responses are not handled") case 201: // req.Start not found return nil, fmt.Errorf("not found") case 202: // Go Away - panic("not handled") + return nil, xerrors.Errorf("received 'go away' response peer") case 203: // Internal Error return nil, fmt.Errorf("block sync peer errored: %q", res.Message) case 204: // Invalid Request @@ -376,29 +167,20 @@ func (bs *BlockSync) GetChainMessages(ctx context.Context, h *types.TipSet, coun return nil, xerrors.Errorf("GetChainMessages failed with all peers(%d): %w", len(peers), err) } -func bstsToFullTipSet(bts *BSTipSet) (*store.FullTipSet, error) { - fts := &store.FullTipSet{} - for i, b := range bts.Blocks { - fb := &types.FullBlock{ - Header: b, - } - for _, mi := range bts.BlsMsgIncludes[i] { - fb.BlsMessages = append(fb.BlsMessages, bts.BlsMessages[mi]) - } - for _, mi := range bts.SecpkMsgIncludes[i] { - fb.SecpkMessages = append(fb.SecpkMessages, bts.SecpkMessages[mi]) - } +func (bs *BlockSync) sendRequestToPeer(ctx context.Context, p peer.ID, req *BlockSyncRequest) (*BlockSyncResponse, error) { + ctx, span := trace.StartSpan(ctx, "sendRequestToPeer") + defer span.End() - fts.Blocks = append(fts.Blocks, fb) + if span.IsRecordingEvents() { + span.AddAttributes( + trace.StringAttribute("peer", p.Pretty()), + ) } - return fts, nil -} - -func (bs *BlockSync) sendRequestToPeer(ctx context.Context, p peer.ID, req *BlockSyncRequest) (*BlockSyncResponse, error) { - s, err := bs.newStream(inet.WithNoDial(ctx, "should already have connection"), p, BlockSyncProtocolID) + s, err := bs.host.NewStream(inet.WithNoDial(ctx, "should already have connection"), p, BlockSyncProtocolID) if err != nil { - return nil, err + bs.RemovePeer(p) + return nil, xerrors.Errorf("failed to open stream to peer: %w", err) } if err := cborutil.WriteCborRPC(s, req); err != nil { @@ -447,9 +229,15 @@ func (bs *BlockSync) GetBlock(ctx context.Context, c cid.Cid) (*types.BlockHeade } func (bs *BlockSync) AddPeer(p peer.ID) { - bs.syncPeersLk.Lock() - defer bs.syncPeersLk.Unlock() - bs.syncPeers[p] = struct{}{} + bs.syncPeers.addPeer(p) +} + +func (bs *BlockSync) RemovePeer(p peer.ID) { + bs.syncPeers.removePeer(p) +} + +func (bs *BlockSync) getPeers() []peer.ID { + return bs.syncPeers.prefSortedPeers() } func (bs *BlockSync) FetchMessagesByCids(ctx context.Context, cids []cid.Cid) ([]*types.Message, error) { @@ -528,3 +316,83 @@ func (bs *BlockSync) fetchCids(ctx context.Context, cids []cid.Cid, cb func(int, return nil } + +type peerStats struct { + successes int + failures int + firstSeen time.Time +} + +type bsPeerTracker struct { + peers map[peer.ID]*peerStats + lk sync.Mutex +} + +func newPeerTracker() *bsPeerTracker { + return &bsPeerTracker{ + peers: make(map[peer.ID]*peerStats), + } +} +func (bpt *bsPeerTracker) addPeer(p peer.ID) { + bpt.lk.Lock() + defer bpt.lk.Unlock() + if _, ok := bpt.peers[p]; ok { + return + } + bpt.peers[p] = &peerStats{ + firstSeen: time.Now(), + } + +} + +func (bpt *bsPeerTracker) prefSortedPeers() []peer.ID { + // TODO: this could probably be cached, but as long as its not too many peers, fine for now + bpt.lk.Lock() + defer bpt.lk.Unlock() + out := make([]peer.ID, 0, len(bpt.peers)) + for p := range bpt.peers { + out = append(out, p) + } + + sort.Slice(out, func(i, j int) bool { + pi := bpt.peers[out[i]] + pj := bpt.peers[out[j]] + if pi.successes > pj.successes { + return true + } + if pi.failures < pj.successes { + return true + } + return pi.firstSeen.Before(pj.firstSeen) + }) + + return out +} + +func (bpt *bsPeerTracker) logSuccess(p peer.ID) { + bpt.lk.Lock() + defer bpt.lk.Unlock() + if pi, ok := bpt.peers[p]; !ok { + log.Warn("log success called on peer not in tracker") + return + } else { + pi.successes++ + } +} + +func (bpt *bsPeerTracker) logFailure(p peer.ID) { + bpt.lk.Lock() + defer bpt.lk.Unlock() + if pi, ok := bpt.peers[p]; !ok { + log.Warn("log failure called on peer not in tracker") + return + } else { + pi.failures++ + } +} + +func (bpt *bsPeerTracker) removePeer(p peer.ID) { + bpt.lk.Lock() + defer bpt.lk.Unlock() + delete(bpt.peers, p) +} diff --git a/chain/cbor_gen.go b/chain/blocksync/cbor_gen.go similarity index 78% rename from chain/cbor_gen.go rename to chain/blocksync/cbor_gen.go index 30e62071b..4af61a490 100644 --- a/chain/cbor_gen.go +++ b/chain/blocksync/cbor_gen.go @@ -1,11 +1,11 @@ -package chain +package blocksync import ( "fmt" "io" "github.com/filecoin-project/lotus/chain/types" - cid "github.com/ipfs/go-cid" + "github.com/ipfs/go-cid" cbg "github.com/whyrusleeping/cbor-gen" xerrors "golang.org/x/xerrors" ) @@ -15,11 +15,15 @@ import ( var _ = xerrors.Errorf func (t *BlockSyncRequest) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } if _, err := w.Write([]byte{131}); err != nil { return err } - // t.t.Start ([]cid.Cid) + // t.t.Start ([]cid.Cid) (slice) if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.Start)))); err != nil { return err } @@ -29,19 +33,20 @@ func (t *BlockSyncRequest) MarshalCBOR(w io.Writer) error { } } - // t.t.RequestLength (uint64) - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, t.RequestLength)); err != nil { + // t.t.RequestLength (uint64) (uint64) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.RequestLength))); err != nil { return err } - // t.t.Options (uint64) - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, t.Options)); err != nil { + // t.t.Options (uint64) (uint64) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.Options))); err != nil { return err } return nil } -func (t *BlockSyncRequest) UnmarshalCBOR(br io.Reader) error { +func (t *BlockSyncRequest) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) maj, extra, err := cbg.CborReadHeader(br) if err != nil { @@ -55,14 +60,14 @@ func (t *BlockSyncRequest) UnmarshalCBOR(br io.Reader) error { return fmt.Errorf("cbor input had wrong number of fields") } - // t.t.Start ([]cid.Cid) + // t.t.Start ([]cid.Cid) (slice) maj, extra, err = cbg.CborReadHeader(br) if err != nil { return err } if extra > 8192 { - return fmt.Errorf("array too large") + return fmt.Errorf("t.Start: array too large (%d)", extra) } if maj != cbg.MajArray { @@ -80,7 +85,7 @@ func (t *BlockSyncRequest) UnmarshalCBOR(br io.Reader) error { t.Start[i] = c } - // t.t.RequestLength (uint64) + // t.t.RequestLength (uint64) (uint64) maj, extra, err = cbg.CborReadHeader(br) if err != nil { @@ -89,8 +94,8 @@ func (t *BlockSyncRequest) UnmarshalCBOR(br io.Reader) error { if maj != cbg.MajUnsignedInt { return fmt.Errorf("wrong type for uint64 field") } - t.RequestLength = extra - // t.t.Options (uint64) + t.RequestLength = uint64(extra) + // t.t.Options (uint64) (uint64) maj, extra, err = cbg.CborReadHeader(br) if err != nil { @@ -99,16 +104,20 @@ func (t *BlockSyncRequest) UnmarshalCBOR(br io.Reader) error { if maj != cbg.MajUnsignedInt { return fmt.Errorf("wrong type for uint64 field") } - t.Options = extra + t.Options = uint64(extra) return nil } func (t *BlockSyncResponse) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } if _, err := w.Write([]byte{131}); err != nil { return err } - // t.t.Chain ([]*chain.BSTipSet) + // t.t.Chain ([]*blocksync.BSTipSet) (slice) if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.Chain)))); err != nil { return err } @@ -118,12 +127,12 @@ func (t *BlockSyncResponse) MarshalCBOR(w io.Writer) error { } } - // t.t.Status (uint64) - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, t.Status)); err != nil { + // t.t.Status (uint64) (uint64) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.Status))); err != nil { return err } - // t.t.Message (string) + // t.t.Message (string) (string) if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.Message)))); err != nil { return err } @@ -133,7 +142,8 @@ func (t *BlockSyncResponse) MarshalCBOR(w io.Writer) error { return nil } -func (t *BlockSyncResponse) UnmarshalCBOR(br io.Reader) error { +func (t *BlockSyncResponse) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) maj, extra, err := cbg.CborReadHeader(br) if err != nil { @@ -147,14 +157,14 @@ func (t *BlockSyncResponse) UnmarshalCBOR(br io.Reader) error { return fmt.Errorf("cbor input had wrong number of fields") } - // t.t.Chain ([]*chain.BSTipSet) + // t.t.Chain ([]*blocksync.BSTipSet) (slice) maj, extra, err = cbg.CborReadHeader(br) if err != nil { return err } if extra > 8192 { - return fmt.Errorf("array too large") + return fmt.Errorf("t.Chain: array too large (%d)", extra) } if maj != cbg.MajArray { @@ -164,6 +174,7 @@ func (t *BlockSyncResponse) UnmarshalCBOR(br io.Reader) error { t.Chain = make([]*BSTipSet, extra) } for i := 0; i < int(extra); i++ { + var v BSTipSet if err := v.UnmarshalCBOR(br); err != nil { return err @@ -172,7 +183,7 @@ func (t *BlockSyncResponse) UnmarshalCBOR(br io.Reader) error { t.Chain[i] = &v } - // t.t.Status (uint64) + // t.t.Status (uint64) (uint64) maj, extra, err = cbg.CborReadHeader(br) if err != nil { @@ -181,39 +192,30 @@ func (t *BlockSyncResponse) UnmarshalCBOR(br io.Reader) error { if maj != cbg.MajUnsignedInt { return fmt.Errorf("wrong type for uint64 field") } - t.Status = extra - // t.t.Message (string) - - maj, extra, err = cbg.CborReadHeader(br) - if err != nil { - return err - } - - if maj != cbg.MajTextString { - return fmt.Errorf("expected cbor type 'text string' in input") - } - - if extra > 256*1024 { - return fmt.Errorf("string in cbor input too long") - } + t.Status = uint64(extra) + // t.t.Message (string) (string) { - buf := make([]byte, extra) - if _, err := io.ReadFull(br, buf); err != nil { + sval, err := cbg.ReadString(br) + if err != nil { return err } - t.Message = string(buf) + t.Message = string(sval) } return nil } func (t *BSTipSet) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } if _, err := w.Write([]byte{133}); err != nil { return err } - // t.t.Blocks ([]*types.BlockHeader) + // t.t.Blocks ([]*types.BlockHeader) (slice) if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.Blocks)))); err != nil { return err } @@ -223,7 +225,7 @@ func (t *BSTipSet) MarshalCBOR(w io.Writer) error { } } - // t.t.BlsMessages ([]*types.Message) + // t.t.BlsMessages ([]*types.Message) (slice) if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.BlsMessages)))); err != nil { return err } @@ -233,7 +235,7 @@ func (t *BSTipSet) MarshalCBOR(w io.Writer) error { } } - // t.t.BlsMsgIncludes ([][]uint64) + // t.t.BlsMsgIncludes ([][]uint64) (slice) if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.BlsMsgIncludes)))); err != nil { return err } @@ -248,7 +250,7 @@ func (t *BSTipSet) MarshalCBOR(w io.Writer) error { } } - // t.t.SecpkMessages ([]*types.SignedMessage) + // t.t.SecpkMessages ([]*types.SignedMessage) (slice) if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.SecpkMessages)))); err != nil { return err } @@ -258,7 +260,7 @@ func (t *BSTipSet) MarshalCBOR(w io.Writer) error { } } - // t.t.SecpkMsgIncludes ([][]uint64) + // t.t.SecpkMsgIncludes ([][]uint64) (slice) if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.SecpkMsgIncludes)))); err != nil { return err } @@ -275,7 +277,8 @@ func (t *BSTipSet) MarshalCBOR(w io.Writer) error { return nil } -func (t *BSTipSet) UnmarshalCBOR(br io.Reader) error { +func (t *BSTipSet) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) maj, extra, err := cbg.CborReadHeader(br) if err != nil { @@ -289,14 +292,14 @@ func (t *BSTipSet) UnmarshalCBOR(br io.Reader) error { return fmt.Errorf("cbor input had wrong number of fields") } - // t.t.Blocks ([]*types.BlockHeader) + // t.t.Blocks ([]*types.BlockHeader) (slice) maj, extra, err = cbg.CborReadHeader(br) if err != nil { return err } if extra > 8192 { - return fmt.Errorf("array too large") + return fmt.Errorf("t.Blocks: array too large (%d)", extra) } if maj != cbg.MajArray { @@ -306,6 +309,7 @@ func (t *BSTipSet) UnmarshalCBOR(br io.Reader) error { t.Blocks = make([]*types.BlockHeader, extra) } for i := 0; i < int(extra); i++ { + var v types.BlockHeader if err := v.UnmarshalCBOR(br); err != nil { return err @@ -314,14 +318,14 @@ func (t *BSTipSet) UnmarshalCBOR(br io.Reader) error { t.Blocks[i] = &v } - // t.t.BlsMessages ([]*types.Message) + // t.t.BlsMessages ([]*types.Message) (slice) maj, extra, err = cbg.CborReadHeader(br) if err != nil { return err } if extra > 8192 { - return fmt.Errorf("array too large") + return fmt.Errorf("t.BlsMessages: array too large (%d)", extra) } if maj != cbg.MajArray { @@ -331,6 +335,7 @@ func (t *BSTipSet) UnmarshalCBOR(br io.Reader) error { t.BlsMessages = make([]*types.Message, extra) } for i := 0; i < int(extra); i++ { + var v types.Message if err := v.UnmarshalCBOR(br); err != nil { return err @@ -339,14 +344,14 @@ func (t *BSTipSet) UnmarshalCBOR(br io.Reader) error { t.BlsMessages[i] = &v } - // t.t.BlsMsgIncludes ([][]uint64) + // t.t.BlsMsgIncludes ([][]uint64) (slice) maj, extra, err = cbg.CborReadHeader(br) if err != nil { return err } if extra > 8192 { - return fmt.Errorf("array too large") + return fmt.Errorf("t.BlsMsgIncludes: array too large (%d)", extra) } if maj != cbg.MajArray { @@ -366,7 +371,7 @@ func (t *BSTipSet) UnmarshalCBOR(br io.Reader) error { return err } if extra > 8192 { - return fmt.Errorf("array too large") + return fmt.Errorf("t.BlsMsgIncludes[i]: array too large (%d)", extra) } if maj != cbg.MajArray { @@ -392,14 +397,14 @@ func (t *BSTipSet) UnmarshalCBOR(br io.Reader) error { } } - // t.t.SecpkMessages ([]*types.SignedMessage) + // t.t.SecpkMessages ([]*types.SignedMessage) (slice) maj, extra, err = cbg.CborReadHeader(br) if err != nil { return err } if extra > 8192 { - return fmt.Errorf("array too large") + return fmt.Errorf("t.SecpkMessages: array too large (%d)", extra) } if maj != cbg.MajArray { @@ -409,6 +414,7 @@ func (t *BSTipSet) UnmarshalCBOR(br io.Reader) error { t.SecpkMessages = make([]*types.SignedMessage, extra) } for i := 0; i < int(extra); i++ { + var v types.SignedMessage if err := v.UnmarshalCBOR(br); err != nil { return err @@ -417,14 +423,14 @@ func (t *BSTipSet) UnmarshalCBOR(br io.Reader) error { t.SecpkMessages[i] = &v } - // t.t.SecpkMsgIncludes ([][]uint64) + // t.t.SecpkMsgIncludes ([][]uint64) (slice) maj, extra, err = cbg.CborReadHeader(br) if err != nil { return err } if extra > 8192 { - return fmt.Errorf("array too large") + return fmt.Errorf("t.SecpkMsgIncludes: array too large (%d)", extra) } if maj != cbg.MajArray { @@ -444,7 +450,7 @@ func (t *BSTipSet) UnmarshalCBOR(br io.Reader) error { return err } if extra > 8192 { - return fmt.Errorf("array too large") + return fmt.Errorf("t.SecpkMsgIncludes[i]: array too large (%d)", extra) } if maj != cbg.MajArray { diff --git a/chain/store/store.go b/chain/store/store.go index 3bbdaa0f7..dcef64027 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -231,7 +231,7 @@ func (cs *ChainStore) MaybeTakeHeavierTipSet(ctx context.Context, ts *types.TipS // TODO: don't do this for initial sync. Now that we don't have a // difference between 'bootstrap sync' and 'caught up' sync, we need // some other heuristic. - return cs.takeHeaviestTipSet(ts) + return cs.takeHeaviestTipSet(ctx, ts) } return nil } @@ -267,7 +267,10 @@ func (cs *ChainStore) reorgWorker(ctx context.Context) chan<- reorg { return out } -func (cs *ChainStore) takeHeaviestTipSet(ts *types.TipSet) error { +func (cs *ChainStore) takeHeaviestTipSet(ctx context.Context, ts *types.TipSet) error { + ctx, span := trace.StartSpan(ctx, "takeHeaviestTipSet") + defer span.End() + if cs.heaviest != nil { // buf if len(cs.reorgCh) > 0 { log.Warnf("Reorg channel running behind, %d reorgs buffered", len(cs.reorgCh)) @@ -280,6 +283,8 @@ func (cs *ChainStore) takeHeaviestTipSet(ts *types.TipSet) error { log.Warnf("no heaviest tipset found, using %s", ts.Cids()) } + span.AddAttributes(trace.BoolAttribute("newHead", true)) + log.Debugf("New heaviest tipset! %s", ts.Cids()) cs.heaviest = ts @@ -296,7 +301,7 @@ func (cs *ChainStore) takeHeaviestTipSet(ts *types.TipSet) error { func (cs *ChainStore) SetHead(ts *types.TipSet) error { cs.heaviestLk.Lock() defer cs.heaviestLk.Unlock() - return cs.takeHeaviestTipSet(ts) + return cs.takeHeaviestTipSet(context.TODO(), ts) } func (cs *ChainStore) Contains(ts *types.TipSet) (bool, error) { diff --git a/chain/sync.go b/chain/sync.go index 80df33d27..d0e24c7a0 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -12,6 +12,7 @@ import ( "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/address" + "github.com/filecoin-project/lotus/chain/blocksync" "github.com/filecoin-project/lotus/chain/state" "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/store" @@ -49,7 +50,7 @@ type Syncer struct { bad *BadBlockCache // handle to the block sync service - Bsync *BlockSync + Bsync *blocksync.BlockSync self peer.ID @@ -61,7 +62,7 @@ type Syncer struct { peerHeadsLk sync.Mutex } -func NewSyncer(sm *stmgr.StateManager, bsync *BlockSync, self peer.ID) (*Syncer, error) { +func NewSyncer(sm *stmgr.StateManager, bsync *blocksync.BlockSync, self peer.ID) (*Syncer, error) { gen, err := sm.ChainStore().GetGenesis() if err != nil { return nil, err @@ -111,14 +112,22 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) { return } + syncer.peerHeadsLk.Lock() syncer.peerHeads[from] = fts.TipSet() syncer.peerHeadsLk.Unlock() syncer.Bsync.AddPeer(from) + bestPweight := syncer.store.GetHeaviestTipSet().Blocks()[0].ParentWeight + targetWeight := fts.TipSet().Blocks()[0].ParentWeight + if targetWeight.LessThan(bestPweight) { + log.Warn("incoming tipset does not appear to be better than our best chain, ignoring for now") + return + } + go func() { if err := syncer.Sync(ctx, fts.TipSet()); err != nil { - log.Errorf("sync error: %+v", err) + log.Errorf("sync error (curW=%s, targetW=%s): %+v", bestPweight, targetWeight, err) } }() } @@ -350,6 +359,12 @@ func (syncer *Syncer) tryLoadFullTipSet(cids []cid.Cid) (*store.FullTipSet, erro func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error { ctx, span := trace.StartSpan(ctx, "chain.Sync") defer span.End() + if span.IsRecordingEvents() { + span.AddAttributes( + trace.StringAttribute("tipset", fmt.Sprint(maybeHead.Cids())), + trace.Int64Attribute("height", int64(maybeHead.Height())), + ) + } syncer.syncLock.Lock() defer syncer.syncLock.Unlock() @@ -359,10 +374,12 @@ func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error { } if err := syncer.collectChain(ctx, maybeHead); err != nil { + span.AddAttributes(trace.StringAttribute("col_error", err.Error())) return xerrors.Errorf("collectChain failed: %w", err) } if err := syncer.store.PutTipSet(ctx, maybeHead); err != nil { + span.AddAttributes(trace.StringAttribute("put_error", err.Error())) return xerrors.Errorf("failed to put synced tipset to chainstore: %w", err) } @@ -682,6 +699,15 @@ func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to trace.Int64Attribute("toHeight", int64(to.Height())), ) + for _, pcid := range from.Parents() { + if syncer.bad.Has(pcid) { + for _, b := range from.Cids() { + syncer.bad.Add(b) + } + return nil, xerrors.Errorf("chain linked to block marked previously as bad (%s, %s)", from.Cids(), pcid) + } + } + blockSet := []*types.TipSet{from} at := from.Parents() @@ -724,6 +750,8 @@ loop: log.Errorf("failed to get blocks: %+v", err) + span.AddAttributes(trace.StringAttribute("error", err.Error())) + // This error will only be logged above, return nil, xerrors.Errorf("failed to get blocks: %w", err) } @@ -756,6 +784,13 @@ loop: log.Warnf("(fork detected) synced header chain (%s - %d) does not link to our best block (%s - %d)", from.Cids(), from.Height(), to.Cids(), to.Height()) fork, err := syncer.syncFork(ctx, last, to) if err != nil { + if xerrors.Is(err, ErrForkTooLong) { + // TODO: we're marking this block bad in the same way that we mark invalid blocks bad. Maybe distinguish? + log.Warn("adding forked chain to our bad tipset cache") + for _, b := range from.Blocks() { + syncer.bad.Add(b.Cid()) + } + } return nil, xerrors.Errorf("failed to sync fork: %w", err) } @@ -765,6 +800,8 @@ loop: return blockSet, nil } +var ErrForkTooLong = fmt.Errorf("fork longer than threshold") + func (syncer *Syncer) syncFork(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) { tips, err := syncer.Bsync.GetBlocks(ctx, from.Parents(), build.ForkLengthThreshold) if err != nil { @@ -791,7 +828,7 @@ func (syncer *Syncer) syncFork(ctx context.Context, from *types.TipSet, to *type } } } - return nil, xerrors.Errorf("fork was longer than our threshold") + return nil, ErrForkTooLong } func (syncer *Syncer) syncMessagesAndCheckState(ctx context.Context, headers []*types.TipSet) error { @@ -873,7 +910,7 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS return nil } -func persistMessages(bs bstore.Blockstore, bst *BSTipSet) error { +func persistMessages(bs bstore.Blockstore, bst *blocksync.BSTipSet) error { for _, m := range bst.BlsMessages { //log.Infof("putting BLS message: %s", m.Cid()) if _, err := store.PutMessage(bs, m); err != nil { @@ -906,6 +943,8 @@ func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error return err } + span.AddAttributes(trace.Int64Attribute("syncChainLength", int64(len(headers)))) + if !headers[0].Equals(ts) { log.Errorf("collectChain headers[0] should be equal to sync target. Its not: %s != %s", headers[0].Cids(), ts.Cids()) } diff --git a/chain/types/cbor_gen.go b/chain/types/cbor_gen.go index fa3cddaf4..1ebe94a13 100644 --- a/chain/types/cbor_gen.go +++ b/chain/types/cbor_gen.go @@ -5,7 +5,7 @@ import ( "io" "math" - cid "github.com/ipfs/go-cid" + "github.com/ipfs/go-cid" cbg "github.com/whyrusleeping/cbor-gen" xerrors "golang.org/x/xerrors" ) diff --git a/gen/main.go b/gen/main.go index b2b0140e7..97590eead 100644 --- a/gen/main.go +++ b/gen/main.go @@ -8,6 +8,7 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/actors" + "github.com/filecoin-project/lotus/chain/blocksync" "github.com/filecoin-project/lotus/chain/deals" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/paych" @@ -71,17 +72,15 @@ func main() { os.Exit(1) } - /* - err = gen.WriteTupleEncodersToFile("./chain/cbor_gen.go", "chain", - chain.BlockSyncRequest{}, - chain.BlockSyncResponse{}, - chain.BSTipSet{}, - ) - if err != nil { - fmt.Println(err) - os.Exit(1) - } - */ + err = gen.WriteTupleEncodersToFile("./chain/blocksync/cbor_gen.go", "blocksync", + blocksync.BlockSyncRequest{}, + blocksync.BlockSyncResponse{}, + blocksync.BSTipSet{}, + ) + if err != nil { + fmt.Println(err) + os.Exit(1) + } err = gen.WriteTupleEncodersToFile("./chain/actors/cbor_gen.go", "actors", actors.InitActorState{}, diff --git a/go.mod b/go.mod index 8e62267a7..5197ee9df 100644 --- a/go.mod +++ b/go.mod @@ -45,11 +45,13 @@ require ( github.com/libp2p/go-libp2p-connmgr v0.1.0 github.com/libp2p/go-libp2p-core v0.2.2 github.com/libp2p/go-libp2p-discovery v0.1.0 + github.com/libp2p/go-libp2p-host v0.1.0 github.com/libp2p/go-libp2p-kad-dht v0.1.1 github.com/libp2p/go-libp2p-mplex v0.2.1 github.com/libp2p/go-libp2p-peer v0.2.0 github.com/libp2p/go-libp2p-peerstore v0.1.3 github.com/libp2p/go-libp2p-pnet v0.1.0 + github.com/libp2p/go-libp2p-protocol v0.1.0 github.com/libp2p/go-libp2p-pubsub v0.1.0 github.com/libp2p/go-libp2p-quic-transport v0.1.1 github.com/libp2p/go-libp2p-record v0.1.1 diff --git a/go.sum b/go.sum index 7e29dcdf1..e6ae50214 100644 --- a/go.sum +++ b/go.sum @@ -22,7 +22,9 @@ github.com/Stebalien/go-bitfield v0.0.1/go.mod h1:GNjFpasyUVkHMsfEOk8EFLJ9syQ6SI github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/akavel/rsrc v0.8.0 h1:zjWn7ukO9Kc5Q62DOJCcxGpXC18RawVtYAGdz2aLlfw= github.com/akavel/rsrc v0.8.0/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkKq+c= +github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc h1:cAKDfWh5VpdgMhJosfJnn5/FoN2SRZ4p7fJNX58YPaU= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf h1:qet1QNfXsQxTZqLG4oE62mJzwPIB8+Tee4RNCL9ulrY= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/apache/thrift v0.12.0 h1:pODnxUFNcjP9UTLZGTdeh+j16A8lJbRvD3rOtrk/7bs= github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= @@ -303,6 +305,8 @@ github.com/libp2p/go-libp2p-crypto v0.1.0 h1:k9MFy+o2zGDNGsaoZl0MA3iZ75qXxr9OOoA github.com/libp2p/go-libp2p-crypto v0.1.0/go.mod h1:sPUokVISZiy+nNuTTH/TY+leRSxnFj/2GLjtOTW90hI= github.com/libp2p/go-libp2p-discovery v0.1.0 h1:j+R6cokKcGbnZLf4kcNwpx6mDEUPF3N6SrqMymQhmvs= github.com/libp2p/go-libp2p-discovery v0.1.0/go.mod h1:4F/x+aldVHjHDHuX85x1zWoFTGElt8HnoDzwkFZm29g= +github.com/libp2p/go-libp2p-host v0.1.0 h1:OZwENiFm6JOK3YR5PZJxkXlJE8a5u8g4YvAUrEV2MjM= +github.com/libp2p/go-libp2p-host v0.1.0/go.mod h1:5+fWuLbDn8OxoxPN3CV0vsLe1hAKScSMbT84qRfxum8= github.com/libp2p/go-libp2p-kad-dht v0.1.1 h1:IH6NQuoUv5w5e1O8Jc3KyVDtr0rNd0G9aaADpLI1xVo= github.com/libp2p/go-libp2p-kad-dht v0.1.1/go.mod h1:1kj2Rk5pX3/0RwqMm9AMNCT7DzcMHYhgDN5VTi+cY0M= github.com/libp2p/go-libp2p-kbucket v0.2.0 h1:FB2a0VkOTNGTP5gu/I444u4WabNM9V1zCkQcWb7zajI= @@ -323,6 +327,8 @@ github.com/libp2p/go-libp2p-peerstore v0.1.3 h1:wMgajt1uM2tMiqf4M+4qWKVyyFc8SfA+ github.com/libp2p/go-libp2p-peerstore v0.1.3/go.mod h1:BJ9sHlm59/80oSkpWgr1MyY1ciXAXV397W6h1GH/uKI= github.com/libp2p/go-libp2p-pnet v0.1.0 h1:kRUES28dktfnHNIRW4Ro78F7rKBHBiw5MJpl0ikrLIA= github.com/libp2p/go-libp2p-pnet v0.1.0/go.mod h1:ZkyZw3d0ZFOex71halXRihWf9WH/j3OevcJdTmD0lyE= +github.com/libp2p/go-libp2p-protocol v0.1.0 h1:HdqhEyhg0ToCaxgMhnOmUO8snQtt/kQlcjVk3UoJU3c= +github.com/libp2p/go-libp2p-protocol v0.1.0/go.mod h1:KQPHpAabB57XQxGrXCNvbL6UEXfQqUgC/1adR2Xtflk= github.com/libp2p/go-libp2p-pubsub v0.1.0 h1:SmQeMa7IUv5vadh0fYgYsafWCBA1sCy5d/68kIYqGcU= github.com/libp2p/go-libp2p-pubsub v0.1.0/go.mod h1:ZwlKzRSe1eGvSIdU5bD7+8RZN/Uzw0t1Bp9R1znpR/Q= github.com/libp2p/go-libp2p-quic-transport v0.1.1 h1:MFMJzvsxIEDEVKzO89BnB/FgvMj9WI4GDGUW2ArDPUA= @@ -474,6 +480,7 @@ github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod h1:p2iRAGwDERtqlqzRXnrOVns+ignqQo//hLXqYxZYVNs= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/common v0.2.0 h1:kUZDBDTdBVBYBj5Tmh2NZLlF60mfjA27rM34b+cVwNU= github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= @@ -481,6 +488,7 @@ github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqn github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/shirou/gopsutil v2.18.12+incompatible h1:1eaJvGomDnH74/5cF4CTmTbLHAriGFsTZppLXDX93OM= github.com/shirou/gopsutil v2.18.12+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= +github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/assertions v1.0.0/go.mod h1:kHHU4qYBaI3q23Pp3VPrmWhuIUrLW/7eUrw0BU5VaoM= @@ -683,6 +691,7 @@ google.golang.org/genproto v0.0.0-20190502173448-54afdca5d873/go.mod h1:VzzqZJRn google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= +gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= diff --git a/node/builder.go b/node/builder.go index 183472dbc..1d1e0d3e7 100644 --- a/node/builder.go +++ b/node/builder.go @@ -19,6 +19,7 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain" + "github.com/filecoin-project/lotus/chain/blocksync" "github.com/filecoin-project/lotus/chain/deals" "github.com/filecoin-project/lotus/chain/market" "github.com/filecoin-project/lotus/chain/metrics" @@ -197,14 +198,14 @@ func Online() Option { // Filecoin services Override(new(*chain.Syncer), chain.NewSyncer), - Override(new(*chain.BlockSync), chain.NewBlockSyncClient), + Override(new(*blocksync.BlockSync), blocksync.NewBlockSyncClient), Override(new(*chain.MessagePool), chain.NewMessagePool), Override(new(modules.Genesis), modules.ErrorGenesis), Override(SetGenesisKey, modules.SetGenesis), Override(new(*hello.Service), hello.NewHelloService), - Override(new(*chain.BlockSyncService), chain.NewBlockSyncService), + Override(new(*blocksync.BlockSyncService), blocksync.NewBlockSyncService), Override(new(*peermgr.PeerMgr), peermgr.NewPeerMgr), Override(RunHelloKey, modules.RunHello), diff --git a/node/hello/hello.go b/node/hello/hello.go index 90af18923..880ef96f1 100644 --- a/node/hello/hello.go +++ b/node/hello/hello.go @@ -12,6 +12,7 @@ import ( "github.com/libp2p/go-libp2p-core/host" inet "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" + protocol "github.com/libp2p/go-libp2p-protocol" "github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain/store" @@ -34,8 +35,9 @@ type Message struct { GenesisHash cid.Cid } +type NewStreamFunc func(context.Context, peer.ID, ...protocol.ID) (inet.Stream, error) type Service struct { - newStream chain.NewStreamFunc + newStream NewStreamFunc cs *store.ChainStore syncer *chain.Syncer diff --git a/node/modules/services.go b/node/modules/services.go index c5c3df125..8cf298d8a 100644 --- a/node/modules/services.go +++ b/node/modules/services.go @@ -9,6 +9,7 @@ import ( "go.uber.org/fx" "github.com/filecoin-project/lotus/chain" + "github.com/filecoin-project/lotus/chain/blocksync" "github.com/filecoin-project/lotus/chain/deals" "github.com/filecoin-project/lotus/chain/sub" "github.com/filecoin-project/lotus/node/hello" @@ -37,8 +38,8 @@ func RunPeerMgr(mctx helpers.MetricsCtx, lc fx.Lifecycle, pmgr *peermgr.PeerMgr) go pmgr.Run(helpers.LifecycleCtx(mctx, lc)) } -func RunBlockSync(h host.Host, svc *chain.BlockSyncService) { - h.SetStreamHandler(chain.BlockSyncProtocolID, svc.HandleStream) +func RunBlockSync(h host.Host, svc *blocksync.BlockSyncService) { + h.SetStreamHandler(blocksync.BlockSyncProtocolID, svc.HandleStream) } func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, pubsub *pubsub.PubSub, s *chain.Syncer) { diff --git a/storage/post.go b/storage/post.go index 3d6ac56b3..bdc0a434d 100644 --- a/storage/post.go +++ b/storage/post.go @@ -142,6 +142,7 @@ func (p *post) doPost(ctx context.Context) error { func (p *post) preparePost(ctx context.Context) error { ctx, span := trace.StartSpan(ctx, "storage.preparePost") defer span.End() + log.Info("preparePost") sset, err := p.m.api.StateMinerProvingSet(ctx, p.m.maddr, p.ts) if err != nil { @@ -251,6 +252,7 @@ func (p *post) waitCommit(ctx context.Context) error { log.Warnf("SubmitPoSt EXIT: %d", rec.Receipt.ExitCode) // TODO: Do something } + log.Infof("Post made it on chain! (height=%d)", rec.TipSet.Height()) return nil }