Merge pull request #1142 from filecoin-project/feat/bs2

Add incremental timeouts to blocksync fetching
This commit is contained in:
Jakub Sztandera 2020-01-23 12:44:08 -08:00 committed by GitHub
commit cd14a5eb40
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 82 additions and 12 deletions

View File

@ -13,7 +13,6 @@ import (
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
inet "github.com/libp2p/go-libp2p-core/network" inet "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
@ -25,12 +24,6 @@ type NewStreamFunc func(context.Context, peer.ID, ...protocol.ID) (inet.Stream,
const BlockSyncProtocolID = "/fil/sync/blk/0.0.1" const BlockSyncProtocolID = "/fil/sync/blk/0.0.1"
func init() {
cbor.RegisterCborType(BlockSyncRequest{})
cbor.RegisterCborType(BlockSyncResponse{})
cbor.RegisterCborType(BSTipSet{})
}
type BlockSyncService struct { type BlockSyncService struct {
cs *store.ChainStore cs *store.ChainStore
} }
@ -55,8 +48,8 @@ func ParseBSOptions(optfield uint64) *BSOptions {
} }
const ( const (
BSOptBlocks = 1 << 0 BSOptBlocks = 1 << iota
BSOptMessages = 1 << 1 BSOptMessages
) )
type BlockSyncResponse struct { type BlockSyncResponse struct {

View File

@ -21,6 +21,7 @@ import (
cborutil "github.com/filecoin-project/go-cbor-util" cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
incrt "github.com/filecoin-project/lotus/lib/increadtimeout"
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/peermgr" "github.com/filecoin-project/lotus/peermgr"
) )
@ -241,16 +242,18 @@ func (bs *BlockSync) sendRequestToPeer(ctx context.Context, p peer.ID, req *Bloc
bs.RemovePeer(p) bs.RemovePeer(p)
return nil, xerrors.Errorf("failed to open stream to peer: %w", err) return nil, xerrors.Errorf("failed to open stream to peer: %w", err)
} }
s.SetDeadline(time.Now().Add(10 * time.Second)) s.SetWriteDeadline(time.Now().Add(5 * time.Second))
defer s.SetDeadline(time.Time{})
if err := cborutil.WriteCborRPC(s, req); err != nil { if err := cborutil.WriteCborRPC(s, req); err != nil {
s.SetWriteDeadline(time.Time{})
bs.syncPeers.logFailure(p, time.Since(start)) bs.syncPeers.logFailure(p, time.Since(start))
return nil, err return nil, err
} }
s.SetWriteDeadline(time.Time{})
var res BlockSyncResponse var res BlockSyncResponse
if err := cborutil.ReadCborRPC(bufio.NewReader(s), &res); err != nil { r := incrt.New(s, 50<<10, 5*time.Second)
if err := cborutil.ReadCborRPC(bufio.NewReader(r), &res); err != nil {
bs.syncPeers.logFailure(p, time.Since(start)) bs.syncPeers.logFailure(p, time.Since(start))
return nil, err return nil, err
} }

1
go.sum
View File

@ -719,6 +719,7 @@ github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f h1:jQa4QT2UP
github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f/go.mod h1:p9UJB6dDgdPgMJZs7UjUOdulKyRr9fqkS+6JKAInPy8= github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f/go.mod h1:p9UJB6dDgdPgMJZs7UjUOdulKyRr9fqkS+6JKAInPy8=
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 h1:EKhdznlJHPMoKr0XTrX+IlJs1LH3lyx2nfr1dOlZ79k= github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 h1:EKhdznlJHPMoKr0XTrX+IlJs1LH3lyx2nfr1dOlZ79k=
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc= github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc=
github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc h1:9lDbC6Rz4bwmou+oE6Dt4Cb2BGMur5eR/GYptkKUVHo=
github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM= github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM=
github.com/whyrusleeping/go-notifier v0.0.0-20170827234753-097c5d47330f/go.mod h1:cZNvX9cFybI01GriPRMXDtczuvUhgbcYr9iCGaNlRv8= github.com/whyrusleeping/go-notifier v0.0.0-20170827234753-097c5d47330f/go.mod h1:cZNvX9cFybI01GriPRMXDtczuvUhgbcYr9iCGaNlRv8=
github.com/whyrusleeping/go-smux-multiplex v3.0.16+incompatible/go.mod h1:34LEDbeKFZInPUrAG+bjuJmUXONGdEFW7XL0SpTY1y4= github.com/whyrusleeping/go-smux-multiplex v3.0.16+incompatible/go.mod h1:34LEDbeKFZInPUrAG+bjuJmUXONGdEFW7XL0SpTY1y4=

View File

@ -0,0 +1,73 @@
package incrt
import (
"io"
"time"
logging "github.com/ipfs/go-log/v2"
)
var log = logging.Logger("incrt")
var now = time.Now
type ReaderDeadline interface {
Read([]byte) (int, error)
SetReadDeadline(time.Time) error
}
type incrt struct {
rd ReaderDeadline
waitPerByte time.Duration
wait time.Duration
maxWait time.Duration
}
// New creates an Incremental Reader Timeout, with minimum sustained speed of
// minSpeed bytes per second and with maximum wait of maxWait
func New(rd ReaderDeadline, minSpeed int64, maxWait time.Duration) io.Reader {
return &incrt{
rd: rd,
waitPerByte: time.Second / time.Duration(minSpeed),
wait: maxWait,
maxWait: maxWait,
}
}
type errNoWait struct{}
func (err errNoWait) Error() string {
return "wait time exceeded"
}
func (err errNoWait) Timeout() bool {
return true
}
func (crt *incrt) Read(buf []byte) (int, error) {
start := now()
if crt.wait == 0 {
return 0, errNoWait{}
}
err := crt.rd.SetReadDeadline(start.Add(crt.wait))
if err != nil {
log.Warnf("unable to set daedline: %+v", err)
}
n, err := crt.rd.Read(buf)
crt.rd.SetReadDeadline(time.Time{})
if err == nil {
dur := now().Sub(start)
crt.wait -= dur
crt.wait += time.Duration(n) * crt.waitPerByte
if crt.wait < 0 {
crt.wait = 0
}
if crt.wait > crt.maxWait {
crt.wait = crt.maxWait
}
}
return n, err
}