Add incremental timeouts to blocksync fetching
Signed-off-by: Jakub Sztandera <kubuxu@protocol.ai>
This commit is contained in:
parent
4310cc23ce
commit
f5d061f34f
@ -8,12 +8,11 @@ import (
|
|||||||
"go.opencensus.io/trace"
|
"go.opencensus.io/trace"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-cbor-util"
|
cborutil "github.com/filecoin-project/go-cbor-util"
|
||||||
"github.com/filecoin-project/lotus/chain/store"
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
cbor "github.com/ipfs/go-ipld-cbor"
|
|
||||||
logging "github.com/ipfs/go-log/v2"
|
logging "github.com/ipfs/go-log/v2"
|
||||||
inet "github.com/libp2p/go-libp2p-core/network"
|
inet "github.com/libp2p/go-libp2p-core/network"
|
||||||
"github.com/libp2p/go-libp2p-core/peer"
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
@ -25,12 +24,6 @@ type NewStreamFunc func(context.Context, peer.ID, ...protocol.ID) (inet.Stream,
|
|||||||
|
|
||||||
const BlockSyncProtocolID = "/fil/sync/blk/0.0.1"
|
const BlockSyncProtocolID = "/fil/sync/blk/0.0.1"
|
||||||
|
|
||||||
func init() {
|
|
||||||
cbor.RegisterCborType(BlockSyncRequest{})
|
|
||||||
cbor.RegisterCborType(BlockSyncResponse{})
|
|
||||||
cbor.RegisterCborType(BSTipSet{})
|
|
||||||
}
|
|
||||||
|
|
||||||
type BlockSyncService struct {
|
type BlockSyncService struct {
|
||||||
cs *store.ChainStore
|
cs *store.ChainStore
|
||||||
}
|
}
|
||||||
@ -55,8 +48,8 @@ func ParseBSOptions(optfield uint64) *BSOptions {
|
|||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
BSOptBlocks = 1 << 0
|
BSOptBlocks = 1 << iota
|
||||||
BSOptMessages = 1 << 1
|
BSOptMessages
|
||||||
)
|
)
|
||||||
|
|
||||||
type BlockSyncResponse struct {
|
type BlockSyncResponse struct {
|
||||||
|
@ -18,9 +18,10 @@ import (
|
|||||||
"go.opencensus.io/trace"
|
"go.opencensus.io/trace"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-cbor-util"
|
cborutil "github.com/filecoin-project/go-cbor-util"
|
||||||
"github.com/filecoin-project/lotus/chain/store"
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
incrt "github.com/filecoin-project/lotus/lib/increadtimeout"
|
||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
"github.com/filecoin-project/lotus/peermgr"
|
"github.com/filecoin-project/lotus/peermgr"
|
||||||
)
|
)
|
||||||
@ -241,16 +242,19 @@ func (bs *BlockSync) sendRequestToPeer(ctx context.Context, p peer.ID, req *Bloc
|
|||||||
bs.RemovePeer(p)
|
bs.RemovePeer(p)
|
||||||
return nil, xerrors.Errorf("failed to open stream to peer: %w", err)
|
return nil, xerrors.Errorf("failed to open stream to peer: %w", err)
|
||||||
}
|
}
|
||||||
s.SetDeadline(time.Now().Add(10 * time.Second))
|
s.SetWriteDeadline(time.Now().Add(5 * time.Second))
|
||||||
defer s.SetDeadline(time.Time{})
|
|
||||||
|
|
||||||
if err := cborutil.WriteCborRPC(s, req); err != nil {
|
if err := cborutil.WriteCborRPC(s, req); err != nil {
|
||||||
|
s.SetWriteDeadline(time.Time{})
|
||||||
bs.syncPeers.logFailure(p, time.Since(start))
|
bs.syncPeers.logFailure(p, time.Since(start))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
s.SetWriteDeadline(time.Time{})
|
||||||
|
|
||||||
var res BlockSyncResponse
|
var res BlockSyncResponse
|
||||||
if err := cborutil.ReadCborRPC(bufio.NewReader(s), &res); err != nil {
|
r := incrt.New(s, 50<<10, 5*time.Second)
|
||||||
|
|
||||||
|
if err := cborutil.ReadCborRPC(bufio.NewReader(r), &res); err != nil {
|
||||||
bs.syncPeers.logFailure(p, time.Since(start))
|
bs.syncPeers.logFailure(p, time.Since(start))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
1
go.mod
1
go.mod
@ -87,6 +87,7 @@ require (
|
|||||||
github.com/stretchr/testify v1.4.0
|
github.com/stretchr/testify v1.4.0
|
||||||
github.com/whyrusleeping/bencher v0.0.0-20190829221104-bb6607aa8bba
|
github.com/whyrusleeping/bencher v0.0.0-20190829221104-bb6607aa8bba
|
||||||
github.com/whyrusleeping/cbor-gen v0.0.0-20200121162646-b63bacf5eaf8
|
github.com/whyrusleeping/cbor-gen v0.0.0-20200121162646-b63bacf5eaf8
|
||||||
|
github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc
|
||||||
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7
|
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7
|
||||||
github.com/whyrusleeping/pubsub v0.0.0-20131020042734-02de8aa2db3d
|
github.com/whyrusleeping/pubsub v0.0.0-20131020042734-02de8aa2db3d
|
||||||
go.opencensus.io v0.22.2
|
go.opencensus.io v0.22.2
|
||||||
|
1
go.sum
1
go.sum
@ -717,6 +717,7 @@ github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f h1:jQa4QT2UP
|
|||||||
github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f/go.mod h1:p9UJB6dDgdPgMJZs7UjUOdulKyRr9fqkS+6JKAInPy8=
|
github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f/go.mod h1:p9UJB6dDgdPgMJZs7UjUOdulKyRr9fqkS+6JKAInPy8=
|
||||||
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 h1:EKhdznlJHPMoKr0XTrX+IlJs1LH3lyx2nfr1dOlZ79k=
|
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 h1:EKhdznlJHPMoKr0XTrX+IlJs1LH3lyx2nfr1dOlZ79k=
|
||||||
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc=
|
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc=
|
||||||
|
github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc h1:9lDbC6Rz4bwmou+oE6Dt4Cb2BGMur5eR/GYptkKUVHo=
|
||||||
github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM=
|
github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM=
|
||||||
github.com/whyrusleeping/go-notifier v0.0.0-20170827234753-097c5d47330f/go.mod h1:cZNvX9cFybI01GriPRMXDtczuvUhgbcYr9iCGaNlRv8=
|
github.com/whyrusleeping/go-notifier v0.0.0-20170827234753-097c5d47330f/go.mod h1:cZNvX9cFybI01GriPRMXDtczuvUhgbcYr9iCGaNlRv8=
|
||||||
github.com/whyrusleeping/go-smux-multiplex v3.0.16+incompatible/go.mod h1:34LEDbeKFZInPUrAG+bjuJmUXONGdEFW7XL0SpTY1y4=
|
github.com/whyrusleeping/go-smux-multiplex v3.0.16+incompatible/go.mod h1:34LEDbeKFZInPUrAG+bjuJmUXONGdEFW7XL0SpTY1y4=
|
||||||
|
63
lib/increadtimeout/incrt.go
Normal file
63
lib/increadtimeout/incrt.go
Normal file
@ -0,0 +1,63 @@
|
|||||||
|
package incrt
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
logging "github.com/ipfs/go-log/v2"
|
||||||
|
)
|
||||||
|
|
||||||
|
var log = logging.Logger("incrt")
|
||||||
|
|
||||||
|
var now = time.Now
|
||||||
|
|
||||||
|
type ReaderDeadline interface {
|
||||||
|
Read([]byte) (int, error)
|
||||||
|
SetReadDeadline(time.Time) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type incrt struct {
|
||||||
|
rd ReaderDeadline
|
||||||
|
|
||||||
|
waitPerByte time.Duration
|
||||||
|
wait time.Duration
|
||||||
|
maxWait time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
// New creates an Incremental Reader Timeout, with minimum sustained speed of
|
||||||
|
// minSpeed bytes per second and with maximum wait of maxWait
|
||||||
|
func New(rd ReaderDeadline, minSpeed int64, maxWait time.Duration) io.Reader {
|
||||||
|
return &incrt{
|
||||||
|
rd: rd,
|
||||||
|
waitPerByte: time.Second / time.Duration(minSpeed),
|
||||||
|
wait: maxWait,
|
||||||
|
maxWait: maxWait,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (crt *incrt) Read(buf []byte) (n int, err error) {
|
||||||
|
start := now()
|
||||||
|
err = crt.rd.SetReadDeadline(start.Add(crt.wait))
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
crt.rd.SetReadDeadline(time.Time{})
|
||||||
|
if err == nil {
|
||||||
|
dur := now().Sub(start)
|
||||||
|
crt.wait -= dur
|
||||||
|
crt.wait += time.Duration(n) * crt.waitPerByte
|
||||||
|
if crt.wait < 0 {
|
||||||
|
crt.wait = 0
|
||||||
|
}
|
||||||
|
if crt.wait > crt.maxWait {
|
||||||
|
crt.wait = crt.maxWait
|
||||||
|
}
|
||||||
|
log.Warnf("max wait: %s, dur: %s, n: %d", crt.wait.String(), dur.String(), n)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
n, err = crt.rd.Read(buf)
|
||||||
|
return n, err
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user