Merge pull request #984 from filecoin-project/feat/tag-good-peers
Tag peers who send us good blocks over pubsub
This commit is contained in:
commit
41aef3d2eb
71
chain/block_receipt_tracker.go
Normal file
71
chain/block_receipt_tracker.go
Normal file
@ -0,0 +1,71 @@
|
|||||||
|
package chain
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sort"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
"github.com/hashicorp/golang-lru"
|
||||||
|
peer "github.com/libp2p/go-libp2p-core/peer"
|
||||||
|
)
|
||||||
|
|
||||||
|
type blockReceiptTracker struct {
|
||||||
|
lk sync.Mutex
|
||||||
|
|
||||||
|
// using an LRU cache because i don't want to handle all the edge cases for
|
||||||
|
// manual cleanup and maintenance of a fixed size set
|
||||||
|
cache *lru.Cache
|
||||||
|
}
|
||||||
|
|
||||||
|
type peerSet struct {
|
||||||
|
peers map[peer.ID]time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
func newBlockReceiptTracker() *blockReceiptTracker {
|
||||||
|
c, _ := lru.New(512)
|
||||||
|
return &blockReceiptTracker{
|
||||||
|
cache: c,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (brt *blockReceiptTracker) Add(p peer.ID, ts *types.TipSet) {
|
||||||
|
brt.lk.Lock()
|
||||||
|
defer brt.lk.Unlock()
|
||||||
|
|
||||||
|
val, ok := brt.cache.Get(ts.Key())
|
||||||
|
if !ok {
|
||||||
|
pset := &peerSet{
|
||||||
|
peers: map[peer.ID]time.Time{
|
||||||
|
p: time.Now(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
brt.cache.Add(ts.Key(), pset)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
val.(*peerSet).peers[p] = time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (brt *blockReceiptTracker) GetPeers(ts *types.TipSet) []peer.ID {
|
||||||
|
brt.lk.Lock()
|
||||||
|
defer brt.lk.Unlock()
|
||||||
|
|
||||||
|
val, ok := brt.cache.Get(ts.Key())
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
ps := val.(*peerSet)
|
||||||
|
|
||||||
|
out := make([]peer.ID, 0, len(ps.peers))
|
||||||
|
for p := range ps.peers {
|
||||||
|
out = append(out, p)
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Slice(out, func(i, j int) bool {
|
||||||
|
return ps.peers[out[i]].Before(ps.peers[out[j]])
|
||||||
|
})
|
||||||
|
|
||||||
|
return out
|
||||||
|
}
|
@ -56,12 +56,12 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
|
|||||||
log.Warnf("Received block with large delay %d from miner %s", delay, blk.Header.Miner)
|
log.Warnf("Received block with large delay %d from miner %s", delay, blk.Header.Miner)
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.InformNewBlock(msg.GetFrom(), &types.FullBlock{
|
if s.InformNewBlock(msg.ReceivedFrom, &types.FullBlock{
|
||||||
Header: blk.Header,
|
Header: blk.Header,
|
||||||
BlsMessages: bmsgs,
|
BlsMessages: bmsgs,
|
||||||
SecpkMessages: smsgs,
|
SecpkMessages: smsgs,
|
||||||
}) {
|
}) {
|
||||||
cmgr.TagPeer(msg.ReceivedFrom, "blkprop", 20)
|
cmgr.TagPeer(msg.ReceivedFrom, "blkprop", 5)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
@ -18,6 +18,7 @@ import (
|
|||||||
hamt "github.com/ipfs/go-hamt-ipld"
|
hamt "github.com/ipfs/go-hamt-ipld"
|
||||||
bstore "github.com/ipfs/go-ipfs-blockstore"
|
bstore "github.com/ipfs/go-ipfs-blockstore"
|
||||||
logging "github.com/ipfs/go-log"
|
logging "github.com/ipfs/go-log"
|
||||||
|
"github.com/libp2p/go-libp2p-core/connmgr"
|
||||||
"github.com/libp2p/go-libp2p-core/peer"
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
cbg "github.com/whyrusleeping/cbor-gen"
|
cbg "github.com/whyrusleeping/cbor-gen"
|
||||||
"github.com/whyrusleeping/pubsub"
|
"github.com/whyrusleeping/pubsub"
|
||||||
@ -42,8 +43,6 @@ var log = logging.Logger("chain")
|
|||||||
var LocalIncoming = "incoming"
|
var LocalIncoming = "incoming"
|
||||||
|
|
||||||
type Syncer struct {
|
type Syncer struct {
|
||||||
// The heaviest known tipset in the network.
|
|
||||||
|
|
||||||
// The interface for accessing and putting tipsets into local storage
|
// The interface for accessing and putting tipsets into local storage
|
||||||
store *store.ChainStore
|
store *store.ChainStore
|
||||||
|
|
||||||
@ -63,10 +62,14 @@ type Syncer struct {
|
|||||||
|
|
||||||
syncmgr *SyncManager
|
syncmgr *SyncManager
|
||||||
|
|
||||||
|
connmgr connmgr.ConnManager
|
||||||
|
|
||||||
incoming *pubsub.PubSub
|
incoming *pubsub.PubSub
|
||||||
|
|
||||||
|
receiptTracker *blockReceiptTracker
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSyncer(sm *stmgr.StateManager, bsync *blocksync.BlockSync, self peer.ID) (*Syncer, error) {
|
func NewSyncer(sm *stmgr.StateManager, bsync *blocksync.BlockSync, connmgr connmgr.ConnManager, self peer.ID) (*Syncer, error) {
|
||||||
gen, err := sm.ChainStore().GetGenesis()
|
gen, err := sm.ChainStore().GetGenesis()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -78,12 +81,14 @@ func NewSyncer(sm *stmgr.StateManager, bsync *blocksync.BlockSync, self peer.ID)
|
|||||||
}
|
}
|
||||||
|
|
||||||
s := &Syncer{
|
s := &Syncer{
|
||||||
bad: NewBadBlockCache(),
|
bad: NewBadBlockCache(),
|
||||||
Genesis: gent,
|
Genesis: gent,
|
||||||
Bsync: bsync,
|
Bsync: bsync,
|
||||||
store: sm.ChainStore(),
|
store: sm.ChainStore(),
|
||||||
sm: sm,
|
sm: sm,
|
||||||
self: self,
|
self: self,
|
||||||
|
receiptTracker: newBlockReceiptTracker(),
|
||||||
|
connmgr: connmgr,
|
||||||
|
|
||||||
incoming: pubsub.New(50),
|
incoming: pubsub.New(50),
|
||||||
}
|
}
|
||||||
@ -401,6 +406,15 @@ func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error {
|
|||||||
return xerrors.Errorf("failed to put synced tipset to chainstore: %w", err)
|
return xerrors.Errorf("failed to put synced tipset to chainstore: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
peers := syncer.receiptTracker.GetPeers(maybeHead)
|
||||||
|
if len(peers) > 0 {
|
||||||
|
syncer.connmgr.TagPeer(peers[0], "new-block", 40)
|
||||||
|
|
||||||
|
for _, p := range peers[1:] {
|
||||||
|
syncer.connmgr.TagPeer(p, "new-block", 25)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
6
go.sum
6
go.sum
@ -7,7 +7,6 @@ github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9 h1:HD8gA2tkBy
|
|||||||
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
|
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
|
||||||
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
|
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
|
||||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||||
github.com/GeertJohan/go.incremental v1.0.0 h1:7AH+pY1XUgQE4Y1HcXYaMqAI0m9yrFqo/jt0CW30vsg=
|
|
||||||
github.com/GeertJohan/go.incremental v1.0.0/go.mod h1:6fAjUhbVuX1KcMD3c8TEgVUqmo4seqhv0i0kdATSkM0=
|
github.com/GeertJohan/go.incremental v1.0.0/go.mod h1:6fAjUhbVuX1KcMD3c8TEgVUqmo4seqhv0i0kdATSkM0=
|
||||||
github.com/GeertJohan/go.rice v1.0.0 h1:KkI6O9uMaQU3VEKaj01ulavtF7o1fWT7+pk/4voiMLQ=
|
github.com/GeertJohan/go.rice v1.0.0 h1:KkI6O9uMaQU3VEKaj01ulavtF7o1fWT7+pk/4voiMLQ=
|
||||||
github.com/GeertJohan/go.rice v1.0.0/go.mod h1:eH6gbSOAUv07dQuZVnBmoDP8mgsM1rtixis4Tib9if0=
|
github.com/GeertJohan/go.rice v1.0.0/go.mod h1:eH6gbSOAUv07dQuZVnBmoDP8mgsM1rtixis4Tib9if0=
|
||||||
@ -21,7 +20,6 @@ github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrU
|
|||||||
github.com/Stebalien/go-bitfield v0.0.1 h1:X3kbSSPUaJK60wV2hjOPZwmpljr6VGCqdq4cBLhbQBo=
|
github.com/Stebalien/go-bitfield v0.0.1 h1:X3kbSSPUaJK60wV2hjOPZwmpljr6VGCqdq4cBLhbQBo=
|
||||||
github.com/Stebalien/go-bitfield v0.0.1/go.mod h1:GNjFpasyUVkHMsfEOk8EFLJ9syQ6SI+XWrX9Wf2XH0s=
|
github.com/Stebalien/go-bitfield v0.0.1/go.mod h1:GNjFpasyUVkHMsfEOk8EFLJ9syQ6SI+XWrX9Wf2XH0s=
|
||||||
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
|
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
|
||||||
github.com/akavel/rsrc v0.8.0 h1:zjWn7ukO9Kc5Q62DOJCcxGpXC18RawVtYAGdz2aLlfw=
|
|
||||||
github.com/akavel/rsrc v0.8.0/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkKq+c=
|
github.com/akavel/rsrc v0.8.0/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkKq+c=
|
||||||
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
|
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
|
||||||
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
|
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
|
||||||
@ -269,7 +267,6 @@ github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsj
|
|||||||
github.com/jbenet/goprocess v0.1.3 h1:YKyIEECS/XvcfHtBzxtjBBbWK+MbvA6dG8ASiqwvr10=
|
github.com/jbenet/goprocess v0.1.3 h1:YKyIEECS/XvcfHtBzxtjBBbWK+MbvA6dG8ASiqwvr10=
|
||||||
github.com/jbenet/goprocess v0.1.3/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
|
github.com/jbenet/goprocess v0.1.3/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
|
||||||
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
||||||
github.com/jessevdk/go-flags v1.4.0 h1:4IU2WS7AumrZ/40jfhf4QVDMsQwqA7VEHozFRrGARJA=
|
|
||||||
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
||||||
github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ=
|
github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ=
|
||||||
github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
|
github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
|
||||||
@ -532,7 +529,6 @@ github.com/multiformats/go-multistream v0.1.0/go.mod h1:fJTiDfXJVmItycydCnNx4+wS
|
|||||||
github.com/multiformats/go-varint v0.0.1 h1:TR/0rdQtnNxuN2IhiB639xC3tWM4IUi7DkTBVTdGW/M=
|
github.com/multiformats/go-varint v0.0.1 h1:TR/0rdQtnNxuN2IhiB639xC3tWM4IUi7DkTBVTdGW/M=
|
||||||
github.com/multiformats/go-varint v0.0.1/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE=
|
github.com/multiformats/go-varint v0.0.1/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE=
|
||||||
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
|
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
|
||||||
github.com/nkovacs/streamquote v0.0.0-20170412213628-49af9bddb229 h1:E2B8qYyeSgv5MXpmzZXRNp8IAQ4vjxIjhpAf5hv/tAg=
|
|
||||||
github.com/nkovacs/streamquote v0.0.0-20170412213628-49af9bddb229/go.mod h1:0aYXnNPJ8l7uZxf45rWW1a/uME32OF0rhiYGNQ2oF2E=
|
github.com/nkovacs/streamquote v0.0.0-20170412213628-49af9bddb229/go.mod h1:0aYXnNPJ8l7uZxf45rWW1a/uME32OF0rhiYGNQ2oF2E=
|
||||||
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
|
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
|
||||||
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
|
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
|
||||||
@ -607,9 +603,7 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
|
|||||||
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
|
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
|
||||||
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
|
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
|
||||||
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
|
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
|
||||||
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
|
|
||||||
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
|
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
|
||||||
github.com/valyala/fasttemplate v1.0.1 h1:tY9CJiPnMXf1ERmG2EyK7gNUd+c6RKGD0IfU8WdUSz8=
|
|
||||||
github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
|
github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
|
||||||
github.com/warpfork/go-wish v0.0.0-20180510122957-5ad1f5abf436/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw=
|
github.com/warpfork/go-wish v0.0.0-20180510122957-5ad1f5abf436/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw=
|
||||||
github.com/warpfork/go-wish v0.0.0-20190328234359-8b3e70f8e830 h1:8kxMKmKzXXL4Ru1nyhvdms/JjWt+3YLpvRb/bAjO/y0=
|
github.com/warpfork/go-wish v0.0.0-20190328234359-8b3e70f8e830 h1:8kxMKmKzXXL4Ru1nyhvdms/JjWt+3YLpvRb/bAjO/y0=
|
||||||
|
@ -11,7 +11,6 @@ import (
|
|||||||
"github.com/ipfs/go-datastore"
|
"github.com/ipfs/go-datastore"
|
||||||
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
||||||
"github.com/libp2p/go-libp2p-core/host"
|
"github.com/libp2p/go-libp2p-core/host"
|
||||||
peer "github.com/libp2p/go-libp2p-core/peer"
|
|
||||||
"github.com/libp2p/go-libp2p-core/routing"
|
"github.com/libp2p/go-libp2p-core/routing"
|
||||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||||
"go.uber.org/fx"
|
"go.uber.org/fx"
|
||||||
@ -127,8 +126,8 @@ func SetGenesis(cs *store.ChainStore, g Genesis) error {
|
|||||||
return cs.SetGenesis(genesis)
|
return cs.SetGenesis(genesis)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSyncer(lc fx.Lifecycle, sm *stmgr.StateManager, bsync *blocksync.BlockSync, self peer.ID) (*chain.Syncer, error) {
|
func NewSyncer(lc fx.Lifecycle, sm *stmgr.StateManager, bsync *blocksync.BlockSync, h host.Host) (*chain.Syncer, error) {
|
||||||
syncer, err := chain.NewSyncer(sm, bsync, self)
|
syncer, err := chain.NewSyncer(sm, bsync, h.ConnManager(), h.ID())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user