Don't try syncing to a chain if its not clearly heavier

This commit is contained in:
whyrusleeping 2019-11-09 12:14:40 -08:00
parent ea13bce02d
commit bda6d7e119
5 changed files with 62 additions and 19 deletions

View File

@ -130,7 +130,7 @@ func (bss *BlockSyncService) processRequest(ctx context.Context, req *BlockSyncR
chain, err := bss.collectChainSegment(req.Start, req.RequestLength, opts)
if err != nil {
log.Error("encountered error while responding to block sync request: ", err)
log.Warn("encountered error while responding to block sync request: ", err)
return &BlockSyncResponse{
Status: 203,
}, nil
@ -149,7 +149,7 @@ func (bss *BlockSyncService) collectChainSegment(start []cid.Cid, length uint64,
var bst BSTipSet
ts, err := bss.cs.LoadTipSet(cur)
if err != nil {
return nil, err
return nil, xerrors.Errorf("failed loading tipset %s: %w", cur, err)
}
if opts.IncludeMessages {
@ -222,8 +222,8 @@ func (bss *BlockSyncService) gatherMessages(ts *types.TipSet) ([]*types.Message,
}
type BlockSync struct {
bserv bserv.BlockService
newStream NewStreamFunc
bserv bserv.BlockService
host host.Host
syncPeersLk sync.Mutex
syncPeers map[peer.ID]struct{}
@ -232,21 +232,11 @@ type BlockSync struct {
func NewBlockSyncClient(bserv dtypes.ChainBlockService, h host.Host) *BlockSync {
return &BlockSync{
bserv: bserv,
newStream: h.NewStream,
host: h,
syncPeers: make(map[peer.ID]struct{}),
}
}
func (bs *BlockSync) getPeers() []peer.ID {
bs.syncPeersLk.Lock()
defer bs.syncPeersLk.Unlock()
var out []peer.ID
for p := range bs.syncPeers {
out = append(out, p)
}
return out
}
func (bs *BlockSync) processStatus(req *BlockSyncRequest, res *BlockSyncResponse) error {
switch res.Status {
case 101: // Partial Response
@ -396,8 +386,18 @@ func bstsToFullTipSet(bts *BSTipSet) (*store.FullTipSet, error) {
}
func (bs *BlockSync) sendRequestToPeer(ctx context.Context, p peer.ID, req *BlockSyncRequest) (*BlockSyncResponse, error) {
s, err := bs.newStream(inet.WithNoDial(ctx, "should already have connection"), p, BlockSyncProtocolID)
ctx, span := trace.StartSpan(ctx, "sendRequestToPeer")
defer span.End()
if span.IsRecordingEvents() {
span.AddAttributes(
trace.StringAttribute("peer", p.Pretty()),
)
}
s, err := bs.host.NewStream(inet.WithNoDial(ctx, "should already have connection"), p, BlockSyncProtocolID)
if err != nil {
bs.RemovePeer(p)
return nil, err
}
@ -452,6 +452,26 @@ func (bs *BlockSync) AddPeer(p peer.ID) {
bs.syncPeers[p] = struct{}{}
}
func (bs *BlockSync) RemovePeer(p peer.ID) {
bs.syncPeersLk.Lock()
defer bs.syncPeersLk.Unlock()
delete(bs.syncPeers, p)
}
func (bs *BlockSync) getPeers() []peer.ID {
bs.syncPeersLk.Lock()
defer bs.syncPeersLk.Unlock()
var out []peer.ID
for p := range bs.syncPeers {
out = append(out, p)
}
return out
}
func (bs *BlockSync) logPeerQuality(p peer.ID) {
}
func (bs *BlockSync) FetchMessagesByCids(ctx context.Context, cids []cid.Cid) ([]*types.Message, error) {
out := make([]*types.Message, len(cids))

View File

@ -231,7 +231,7 @@ func (cs *ChainStore) MaybeTakeHeavierTipSet(ctx context.Context, ts *types.TipS
// TODO: don't do this for initial sync. Now that we don't have a
// difference between 'bootstrap sync' and 'caught up' sync, we need
// some other heuristic.
return cs.takeHeaviestTipSet(ts)
return cs.takeHeaviestTipSet(ctx, ts)
}
return nil
}
@ -267,7 +267,10 @@ func (cs *ChainStore) reorgWorker(ctx context.Context) chan<- reorg {
return out
}
func (cs *ChainStore) takeHeaviestTipSet(ts *types.TipSet) error {
func (cs *ChainStore) takeHeaviestTipSet(ctx context.Context, ts *types.TipSet) error {
ctx, span := trace.StartSpan(ctx, "takeHeaviestTipSet")
defer span.End()
if cs.heaviest != nil { // buf
if len(cs.reorgCh) > 0 {
log.Warnf("Reorg channel running behind, %d reorgs buffered", len(cs.reorgCh))
@ -280,6 +283,8 @@ func (cs *ChainStore) takeHeaviestTipSet(ts *types.TipSet) error {
log.Warnf("no heaviest tipset found, using %s", ts.Cids())
}
span.AddAttributes(trace.BoolAttribute("newHead", true))
log.Debugf("New heaviest tipset! %s", ts.Cids())
cs.heaviest = ts
@ -296,7 +301,7 @@ func (cs *ChainStore) takeHeaviestTipSet(ts *types.TipSet) error {
func (cs *ChainStore) SetHead(ts *types.TipSet) error {
cs.heaviestLk.Lock()
defer cs.heaviestLk.Unlock()
return cs.takeHeaviestTipSet(ts)
return cs.takeHeaviestTipSet(context.TODO(), ts)
}
func (cs *ChainStore) Contains(ts *types.TipSet) (bool, error) {

View File

@ -111,11 +111,18 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) {
return
}
syncer.peerHeadsLk.Lock()
syncer.peerHeads[from] = fts.TipSet()
syncer.peerHeadsLk.Unlock()
syncer.Bsync.AddPeer(from)
bestPweight := syncer.store.GetHeaviestTipSet().Blocks()[0].ParentWeight
if fts.TipSet().Blocks()[0].ParentWeight.LessThan(bestPweight) {
log.Warn("incoming tipset does not appear to be better than our best chain, ignoring for now")
return
}
go func() {
if err := syncer.Sync(ctx, fts.TipSet()); err != nil {
log.Errorf("sync error: %+v", err)
@ -724,6 +731,8 @@ loop:
log.Errorf("failed to get blocks: %+v", err)
span.AddAttributes(trace.StringAttribute("error", err.Error()))
// This error will only be logged above,
return nil, xerrors.Errorf("failed to get blocks: %w", err)
}
@ -906,6 +915,8 @@ func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error
return err
}
span.AddAttributes(trace.Int64Attribute("syncChainLength", int64(len(headers))))
if !headers[0].Equals(ts) {
log.Errorf("collectChain headers[0] should be equal to sync target. Its not: %s != %s", headers[0].Cids(), ts.Cids())
}

5
go.sum
View File

@ -22,7 +22,9 @@ github.com/Stebalien/go-bitfield v0.0.1/go.mod h1:GNjFpasyUVkHMsfEOk8EFLJ9syQ6SI
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/akavel/rsrc v0.8.0 h1:zjWn7ukO9Kc5Q62DOJCcxGpXC18RawVtYAGdz2aLlfw=
github.com/akavel/rsrc v0.8.0/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkKq+c=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc h1:cAKDfWh5VpdgMhJosfJnn5/FoN2SRZ4p7fJNX58YPaU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf h1:qet1QNfXsQxTZqLG4oE62mJzwPIB8+Tee4RNCL9ulrY=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/apache/thrift v0.12.0 h1:pODnxUFNcjP9UTLZGTdeh+j16A8lJbRvD3rOtrk/7bs=
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
@ -474,6 +476,7 @@ github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP
github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod h1:p2iRAGwDERtqlqzRXnrOVns+ignqQo//hLXqYxZYVNs=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/common v0.2.0 h1:kUZDBDTdBVBYBj5Tmh2NZLlF60mfjA27rM34b+cVwNU=
github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
@ -481,6 +484,7 @@ github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqn
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/shirou/gopsutil v2.18.12+incompatible h1:1eaJvGomDnH74/5cF4CTmTbLHAriGFsTZppLXDX93OM=
github.com/shirou/gopsutil v2.18.12+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/assertions v1.0.0/go.mod h1:kHHU4qYBaI3q23Pp3VPrmWhuIUrLW/7eUrw0BU5VaoM=
@ -683,6 +687,7 @@ google.golang.org/genproto v0.0.0-20190502173448-54afdca5d873/go.mod h1:VzzqZJRn
google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=

View File

@ -142,6 +142,7 @@ func (p *post) doPost(ctx context.Context) error {
func (p *post) preparePost(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "storage.preparePost")
defer span.End()
log.Info("preparePost")
sset, err := p.m.api.StateMinerProvingSet(ctx, p.m.maddr, p.ts)
if err != nil {
@ -251,6 +252,7 @@ func (p *post) waitCommit(ctx context.Context) error {
log.Warnf("SubmitPoSt EXIT: %d", rec.Receipt.ExitCode)
// TODO: Do something
}
log.Infof("Post made it on chain! (height=%d)", rec.TipSet.Height())
return nil
}