diff --git a/chain/block_receipt_tracker.go b/chain/block_receipt_tracker.go new file mode 100644 index 000000000..f182fd180 --- /dev/null +++ b/chain/block_receipt_tracker.go @@ -0,0 +1,71 @@ +package chain + +import ( + "sort" + "sync" + "time" + + "github.com/filecoin-project/lotus/chain/types" + "github.com/hashicorp/golang-lru" + peer "github.com/libp2p/go-libp2p-core/peer" +) + +type blockReceiptTracker struct { + lk sync.Mutex + + // using an LRU cache because i don't want to handle all the edge cases for + // manual cleanup and maintenance of a fixed size set + cache *lru.Cache +} + +type peerSet struct { + peers map[peer.ID]time.Time +} + +func newBlockReceiptTracker() *blockReceiptTracker { + c, _ := lru.New(512) + return &blockReceiptTracker{ + cache: c, + } +} + +func (brt *blockReceiptTracker) Add(p peer.ID, ts *types.TipSet) { + brt.lk.Lock() + defer brt.lk.Unlock() + + val, ok := brt.cache.Get(ts.Key()) + if !ok { + pset := &peerSet{ + peers: map[peer.ID]time.Time{ + p: time.Now(), + }, + } + brt.cache.Add(ts.Key(), pset) + return + } + + val.(*peerSet).peers[p] = time.Now() +} + +func (brt *blockReceiptTracker) GetPeers(ts *types.TipSet) []peer.ID { + brt.lk.Lock() + defer brt.lk.Unlock() + + val, ok := brt.cache.Get(ts.Key()) + if !ok { + return nil + } + + ps := val.(*peerSet) + + out := make([]peer.ID, 0, len(ps.peers)) + for p := range ps.peers { + out = append(out, p) + } + + sort.Slice(out, func(i, j int) bool { + return ps.peers[out[i]].Before(ps.peers[out[j]]) + }) + + return out +} diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index 681165f47..79dc29b80 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -56,12 +56,12 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha log.Warnf("Received block with large delay %d from miner %s", delay, blk.Header.Miner) } - if s.InformNewBlock(msg.GetFrom(), &types.FullBlock{ + if s.InformNewBlock(msg.ReceivedFrom, &types.FullBlock{ Header: blk.Header, BlsMessages: bmsgs, SecpkMessages: smsgs, }) { - cmgr.TagPeer(msg.ReceivedFrom, "blkprop", 20) + cmgr.TagPeer(msg.ReceivedFrom, "blkprop", 5) } }() } diff --git a/chain/sync.go b/chain/sync.go index b8018c793..3c80b3275 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -18,6 +18,7 @@ import ( hamt "github.com/ipfs/go-hamt-ipld" bstore "github.com/ipfs/go-ipfs-blockstore" logging "github.com/ipfs/go-log" + "github.com/libp2p/go-libp2p-core/connmgr" "github.com/libp2p/go-libp2p-core/peer" cbg "github.com/whyrusleeping/cbor-gen" "github.com/whyrusleeping/pubsub" @@ -42,8 +43,6 @@ var log = logging.Logger("chain") var LocalIncoming = "incoming" type Syncer struct { - // The heaviest known tipset in the network. - // The interface for accessing and putting tipsets into local storage store *store.ChainStore @@ -63,10 +62,14 @@ type Syncer struct { syncmgr *SyncManager + connmgr connmgr.ConnManager + incoming *pubsub.PubSub + + receiptTracker *blockReceiptTracker } -func NewSyncer(sm *stmgr.StateManager, bsync *blocksync.BlockSync, self peer.ID) (*Syncer, error) { +func NewSyncer(sm *stmgr.StateManager, bsync *blocksync.BlockSync, connmgr connmgr.ConnManager, self peer.ID) (*Syncer, error) { gen, err := sm.ChainStore().GetGenesis() if err != nil { return nil, err @@ -78,12 +81,14 @@ func NewSyncer(sm *stmgr.StateManager, bsync *blocksync.BlockSync, self peer.ID) } s := &Syncer{ - bad: NewBadBlockCache(), - Genesis: gent, - Bsync: bsync, - store: sm.ChainStore(), - sm: sm, - self: self, + bad: NewBadBlockCache(), + Genesis: gent, + Bsync: bsync, + store: sm.ChainStore(), + sm: sm, + self: self, + receiptTracker: newBlockReceiptTracker(), + connmgr: connmgr, incoming: pubsub.New(50), } @@ -401,6 +406,15 @@ func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error { return xerrors.Errorf("failed to put synced tipset to chainstore: %w", err) } + peers := syncer.receiptTracker.GetPeers(maybeHead) + if len(peers) > 0 { + syncer.connmgr.TagPeer(peers[0], "new-block", 40) + + for _, p := range peers[1:] { + syncer.connmgr.TagPeer(p, "new-block", 25) + } + } + return nil } diff --git a/go.sum b/go.sum index f7e70bd83..6cd2e9703 100644 --- a/go.sum +++ b/go.sum @@ -7,7 +7,6 @@ github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9 h1:HD8gA2tkBy github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/GeertJohan/go.incremental v1.0.0 h1:7AH+pY1XUgQE4Y1HcXYaMqAI0m9yrFqo/jt0CW30vsg= github.com/GeertJohan/go.incremental v1.0.0/go.mod h1:6fAjUhbVuX1KcMD3c8TEgVUqmo4seqhv0i0kdATSkM0= github.com/GeertJohan/go.rice v1.0.0 h1:KkI6O9uMaQU3VEKaj01ulavtF7o1fWT7+pk/4voiMLQ= github.com/GeertJohan/go.rice v1.0.0/go.mod h1:eH6gbSOAUv07dQuZVnBmoDP8mgsM1rtixis4Tib9if0= @@ -21,7 +20,6 @@ github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrU github.com/Stebalien/go-bitfield v0.0.1 h1:X3kbSSPUaJK60wV2hjOPZwmpljr6VGCqdq4cBLhbQBo= github.com/Stebalien/go-bitfield v0.0.1/go.mod h1:GNjFpasyUVkHMsfEOk8EFLJ9syQ6SI+XWrX9Wf2XH0s= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= -github.com/akavel/rsrc v0.8.0 h1:zjWn7ukO9Kc5Q62DOJCcxGpXC18RawVtYAGdz2aLlfw= github.com/akavel/rsrc v0.8.0/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkKq+c= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= @@ -269,7 +267,6 @@ github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsj github.com/jbenet/goprocess v0.1.3 h1:YKyIEECS/XvcfHtBzxtjBBbWK+MbvA6dG8ASiqwvr10= github.com/jbenet/goprocess v0.1.3/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= -github.com/jessevdk/go-flags v1.4.0 h1:4IU2WS7AumrZ/40jfhf4QVDMsQwqA7VEHozFRrGARJA= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ= github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= @@ -532,7 +529,6 @@ github.com/multiformats/go-multistream v0.1.0/go.mod h1:fJTiDfXJVmItycydCnNx4+wS github.com/multiformats/go-varint v0.0.1 h1:TR/0rdQtnNxuN2IhiB639xC3tWM4IUi7DkTBVTdGW/M= github.com/multiformats/go-varint v0.0.1/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= -github.com/nkovacs/streamquote v0.0.0-20170412213628-49af9bddb229 h1:E2B8qYyeSgv5MXpmzZXRNp8IAQ4vjxIjhpAf5hv/tAg= github.com/nkovacs/streamquote v0.0.0-20170412213628-49af9bddb229/go.mod h1:0aYXnNPJ8l7uZxf45rWW1a/uME32OF0rhiYGNQ2oF2E= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -607,9 +603,7 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= -github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= -github.com/valyala/fasttemplate v1.0.1 h1:tY9CJiPnMXf1ERmG2EyK7gNUd+c6RKGD0IfU8WdUSz8= github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8= github.com/warpfork/go-wish v0.0.0-20180510122957-5ad1f5abf436/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw= github.com/warpfork/go-wish v0.0.0-20190328234359-8b3e70f8e830 h1:8kxMKmKzXXL4Ru1nyhvdms/JjWt+3YLpvRb/bAjO/y0= diff --git a/node/modules/chain.go b/node/modules/chain.go index ed5a5f4a1..bc0ff3ce2 100644 --- a/node/modules/chain.go +++ b/node/modules/chain.go @@ -11,7 +11,6 @@ import ( "github.com/ipfs/go-datastore" blockstore "github.com/ipfs/go-ipfs-blockstore" "github.com/libp2p/go-libp2p-core/host" - peer "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/routing" pubsub "github.com/libp2p/go-libp2p-pubsub" "go.uber.org/fx" @@ -127,8 +126,8 @@ func SetGenesis(cs *store.ChainStore, g Genesis) error { return cs.SetGenesis(genesis) } -func NewSyncer(lc fx.Lifecycle, sm *stmgr.StateManager, bsync *blocksync.BlockSync, self peer.ID) (*chain.Syncer, error) { - syncer, err := chain.NewSyncer(sm, bsync, self) +func NewSyncer(lc fx.Lifecycle, sm *stmgr.StateManager, bsync *blocksync.BlockSync, h host.Host) (*chain.Syncer, error) { + syncer, err := chain.NewSyncer(sm, bsync, h.ConnManager(), h.ID()) if err != nil { return nil, err }