thread safe piecereader
This commit is contained in:
parent
d502eeba2b
commit
d0af0eff9f
20
extern/sector-storage/piece_reader.go
vendored
20
extern/sector-storage/piece_reader.go
vendored
@ -4,6 +4,7 @@ import (
|
|||||||
"bufio"
|
"bufio"
|
||||||
"context"
|
"context"
|
||||||
"io"
|
"io"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
"go.opencensus.io/stats"
|
"go.opencensus.io/stats"
|
||||||
@ -31,6 +32,7 @@ type pieceReader struct {
|
|||||||
closed bool
|
closed bool
|
||||||
seqAt int64 // next byte to be read by io.Reader
|
seqAt int64 // next byte to be read by io.Reader
|
||||||
|
|
||||||
|
mu sync.Mutex
|
||||||
r io.ReadCloser
|
r io.ReadCloser
|
||||||
br *bufio.Reader
|
br *bufio.Reader
|
||||||
rAt int64
|
rAt int64
|
||||||
@ -62,6 +64,9 @@ func (p *pieceReader) check() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *pieceReader) Close() error {
|
func (p *pieceReader) Close() error {
|
||||||
|
p.mu.Lock()
|
||||||
|
defer p.mu.Unlock()
|
||||||
|
|
||||||
if err := p.check(); err != nil {
|
if err := p.check(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -84,16 +89,22 @@ func (p *pieceReader) Close() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *pieceReader) Read(b []byte) (int, error) {
|
func (p *pieceReader) Read(b []byte) (int, error) {
|
||||||
|
p.mu.Lock()
|
||||||
|
defer p.mu.Unlock()
|
||||||
|
|
||||||
if err := p.check(); err != nil {
|
if err := p.check(); err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
n, err := p.ReadAt(b, p.seqAt)
|
n, err := p.readAtUnlocked(b, p.seqAt)
|
||||||
p.seqAt += int64(n)
|
p.seqAt += int64(n)
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pieceReader) Seek(offset int64, whence int) (int64, error) {
|
func (p *pieceReader) Seek(offset int64, whence int) (int64, error) {
|
||||||
|
p.mu.Lock()
|
||||||
|
defer p.mu.Unlock()
|
||||||
|
|
||||||
if err := p.check(); err != nil {
|
if err := p.check(); err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
@ -113,6 +124,13 @@ func (p *pieceReader) Seek(offset int64, whence int) (int64, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) {
|
func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) {
|
||||||
|
p.mu.Lock()
|
||||||
|
defer p.mu.Unlock()
|
||||||
|
|
||||||
|
return p.readAtUnlocked(b, off)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pieceReader) readAtUnlocked(b []byte, off int64) (n int, err error) {
|
||||||
if err := p.check(); err != nil {
|
if err := p.check(); err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user