diff --git a/chain/blocksync/blocksync.go b/chain/blocksync/blocksync.go index b810c7783..ba8c64ebf 100644 --- a/chain/blocksync/blocksync.go +++ b/chain/blocksync/blocksync.go @@ -8,12 +8,11 @@ import ( "go.opencensus.io/trace" "golang.org/x/xerrors" - "github.com/filecoin-project/go-cbor-util" + cborutil "github.com/filecoin-project/go-cbor-util" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" "github.com/ipfs/go-cid" - cbor "github.com/ipfs/go-ipld-cbor" logging "github.com/ipfs/go-log/v2" inet "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" @@ -25,12 +24,6 @@ type NewStreamFunc func(context.Context, peer.ID, ...protocol.ID) (inet.Stream, const BlockSyncProtocolID = "/fil/sync/blk/0.0.1" -func init() { - cbor.RegisterCborType(BlockSyncRequest{}) - cbor.RegisterCborType(BlockSyncResponse{}) - cbor.RegisterCborType(BSTipSet{}) -} - type BlockSyncService struct { cs *store.ChainStore } @@ -55,8 +48,8 @@ func ParseBSOptions(optfield uint64) *BSOptions { } const ( - BSOptBlocks = 1 << 0 - BSOptMessages = 1 << 1 + BSOptBlocks = 1 << iota + BSOptMessages ) type BlockSyncResponse struct { diff --git a/chain/blocksync/blocksync_client.go b/chain/blocksync/blocksync_client.go index f3d681e46..885ed2de7 100644 --- a/chain/blocksync/blocksync_client.go +++ b/chain/blocksync/blocksync_client.go @@ -18,9 +18,10 @@ import ( "go.opencensus.io/trace" "golang.org/x/xerrors" - "github.com/filecoin-project/go-cbor-util" + cborutil "github.com/filecoin-project/go-cbor-util" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" + incrt "github.com/filecoin-project/lotus/lib/increadtimeout" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/peermgr" ) @@ -241,16 +242,19 @@ func (bs *BlockSync) sendRequestToPeer(ctx context.Context, p peer.ID, req *Bloc bs.RemovePeer(p) return nil, xerrors.Errorf("failed to open stream to peer: %w", err) } - s.SetDeadline(time.Now().Add(10 * time.Second)) - defer s.SetDeadline(time.Time{}) + s.SetWriteDeadline(time.Now().Add(5 * time.Second)) if err := cborutil.WriteCborRPC(s, req); err != nil { + s.SetWriteDeadline(time.Time{}) bs.syncPeers.logFailure(p, time.Since(start)) return nil, err } + s.SetWriteDeadline(time.Time{}) var res BlockSyncResponse - if err := cborutil.ReadCborRPC(bufio.NewReader(s), &res); err != nil { + r := incrt.New(s, 50<<10, 5*time.Second) + + if err := cborutil.ReadCborRPC(bufio.NewReader(r), &res); err != nil { bs.syncPeers.logFailure(p, time.Since(start)) return nil, err } diff --git a/go.mod b/go.mod index 914691b7f..0660c07b5 100644 --- a/go.mod +++ b/go.mod @@ -87,6 +87,7 @@ require ( github.com/stretchr/testify v1.4.0 github.com/whyrusleeping/bencher v0.0.0-20190829221104-bb6607aa8bba github.com/whyrusleeping/cbor-gen v0.0.0-20200121162646-b63bacf5eaf8 + github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 github.com/whyrusleeping/pubsub v0.0.0-20131020042734-02de8aa2db3d go.opencensus.io v0.22.2 diff --git a/go.sum b/go.sum index 47f2659a5..25110e48c 100644 --- a/go.sum +++ b/go.sum @@ -717,6 +717,7 @@ github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f h1:jQa4QT2UP github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f/go.mod h1:p9UJB6dDgdPgMJZs7UjUOdulKyRr9fqkS+6JKAInPy8= github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 h1:EKhdznlJHPMoKr0XTrX+IlJs1LH3lyx2nfr1dOlZ79k= github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc= +github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc h1:9lDbC6Rz4bwmou+oE6Dt4Cb2BGMur5eR/GYptkKUVHo= github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM= github.com/whyrusleeping/go-notifier v0.0.0-20170827234753-097c5d47330f/go.mod h1:cZNvX9cFybI01GriPRMXDtczuvUhgbcYr9iCGaNlRv8= github.com/whyrusleeping/go-smux-multiplex v3.0.16+incompatible/go.mod h1:34LEDbeKFZInPUrAG+bjuJmUXONGdEFW7XL0SpTY1y4= diff --git a/lib/increadtimeout/incrt.go b/lib/increadtimeout/incrt.go new file mode 100644 index 000000000..bb209ee8f --- /dev/null +++ b/lib/increadtimeout/incrt.go @@ -0,0 +1,63 @@ +package incrt + +import ( + "io" + "time" + + logging "github.com/ipfs/go-log/v2" +) + +var log = logging.Logger("incrt") + +var now = time.Now + +type ReaderDeadline interface { + Read([]byte) (int, error) + SetReadDeadline(time.Time) error +} + +type incrt struct { + rd ReaderDeadline + + waitPerByte time.Duration + wait time.Duration + maxWait time.Duration +} + +// New creates an Incremental Reader Timeout, with minimum sustained speed of +// minSpeed bytes per second and with maximum wait of maxWait +func New(rd ReaderDeadline, minSpeed int64, maxWait time.Duration) io.Reader { + return &incrt{ + rd: rd, + waitPerByte: time.Second / time.Duration(minSpeed), + wait: maxWait, + maxWait: maxWait, + } +} + +func (crt *incrt) Read(buf []byte) (n int, err error) { + start := now() + err = crt.rd.SetReadDeadline(start.Add(crt.wait)) + if err != nil { + return 0, err + } + + defer func() { + crt.rd.SetReadDeadline(time.Time{}) + if err == nil { + dur := now().Sub(start) + crt.wait -= dur + crt.wait += time.Duration(n) * crt.waitPerByte + if crt.wait < 0 { + crt.wait = 0 + } + if crt.wait > crt.maxWait { + crt.wait = crt.maxWait + } + log.Warnf("max wait: %s, dur: %s, n: %d", crt.wait.String(), dur.String(), n) + } + }() + + n, err = crt.rd.Read(buf) + return n, err +}