Merge pull request #8397 from filecoin-project/feat/thread-safe-piecereader
Thread safe piecereader
This commit is contained in:
commit
ba2596e318
20
extern/sector-storage/piece_reader.go
vendored
20
extern/sector-storage/piece_reader.go
vendored
@ -4,6 +4,7 @@ import (
|
||||
"bufio"
|
||||
"context"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
"go.opencensus.io/stats"
|
||||
@ -31,6 +32,7 @@ type pieceReader struct {
|
||||
closed bool
|
||||
seqAt int64 // next byte to be read by io.Reader
|
||||
|
||||
mu sync.Mutex
|
||||
r io.ReadCloser
|
||||
br *bufio.Reader
|
||||
rAt int64
|
||||
@ -62,6 +64,9 @@ func (p *pieceReader) check() error {
|
||||
}
|
||||
|
||||
func (p *pieceReader) Close() error {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
if err := p.check(); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -84,16 +89,22 @@ func (p *pieceReader) Close() error {
|
||||
}
|
||||
|
||||
func (p *pieceReader) Read(b []byte) (int, error) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
if err := p.check(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
n, err := p.ReadAt(b, p.seqAt)
|
||||
n, err := p.readAtUnlocked(b, p.seqAt)
|
||||
p.seqAt += int64(n)
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (p *pieceReader) Seek(offset int64, whence int) (int64, error) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
if err := p.check(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
@ -113,6 +124,13 @@ func (p *pieceReader) Seek(offset int64, whence int) (int64, error) {
|
||||
}
|
||||
|
||||
func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
return p.readAtUnlocked(b, off)
|
||||
}
|
||||
|
||||
func (p *pieceReader) readAtUnlocked(b []byte, off int64) (n int, err error) {
|
||||
if err := p.check(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user