lotus/lib/increadtimeout/incrt.go

74 lines
1.4 KiB
Go
Raw Permalink Normal View History

package incrt
import (
"io"
"time"
logging "github.com/ipfs/go-log/v2"
2020-07-10 14:43:14 +00:00
"github.com/filecoin-project/lotus/build"
)
var log = logging.Logger("incrt")
type ReaderDeadline interface {
Read([]byte) (int, error)
SetReadDeadline(time.Time) error
}
type incrt struct {
rd ReaderDeadline
waitPerByte time.Duration
wait time.Duration
maxWait time.Duration
}
// New creates an Incremental Reader Timeout, with minimum sustained speed of
// minSpeed bytes per second and with maximum wait of maxWait
func New(rd ReaderDeadline, minSpeed int64, maxWait time.Duration) io.Reader {
return &incrt{
rd: rd,
waitPerByte: time.Second / time.Duration(minSpeed),
wait: maxWait,
maxWait: maxWait,
}
}
type errNoWait struct{}
func (err errNoWait) Error() string {
return "wait time exceeded"
}
func (err errNoWait) Timeout() bool {
return true
}
func (crt *incrt) Read(buf []byte) (int, error) {
2020-07-10 14:43:14 +00:00
start := build.Clock.Now()
if crt.wait == 0 {
return 0, errNoWait{}
}
err := crt.rd.SetReadDeadline(start.Add(crt.wait))
if err != nil {
log.Debugf("unable to set deadline: %+v", err)
}
n, err := crt.rd.Read(buf)
_ = crt.rd.SetReadDeadline(time.Time{})
if err == nil {
2020-07-10 14:43:14 +00:00
dur := build.Clock.Now().Sub(start)
crt.wait -= dur
crt.wait += time.Duration(n) * crt.waitPerByte
if crt.wait < 0 {
crt.wait = 0
}
if crt.wait > crt.maxWait {
crt.wait = crt.maxWait
}
}
return n, err
}