diff --git a/extern/sector-storage/piece_reader.go b/extern/sector-storage/piece_reader.go index d7a3f4e98..882a2a6d9 100644 --- a/extern/sector-storage/piece_reader.go +++ b/extern/sector-storage/piece_reader.go @@ -4,6 +4,7 @@ import ( "bufio" "context" "io" + "sync" "github.com/ipfs/go-cid" "go.opencensus.io/stats" @@ -31,6 +32,7 @@ type pieceReader struct { closed bool seqAt int64 // next byte to be read by io.Reader + mu sync.Mutex r io.ReadCloser br *bufio.Reader rAt int64 @@ -62,6 +64,9 @@ func (p *pieceReader) check() error { } func (p *pieceReader) Close() error { + p.mu.Lock() + defer p.mu.Unlock() + if err := p.check(); err != nil { return err } @@ -84,16 +89,22 @@ func (p *pieceReader) Close() error { } func (p *pieceReader) Read(b []byte) (int, error) { + p.mu.Lock() + defer p.mu.Unlock() + if err := p.check(); err != nil { return 0, err } - n, err := p.ReadAt(b, p.seqAt) + n, err := p.readAtUnlocked(b, p.seqAt) p.seqAt += int64(n) return n, err } func (p *pieceReader) Seek(offset int64, whence int) (int64, error) { + p.mu.Lock() + defer p.mu.Unlock() + if err := p.check(); err != nil { return 0, err } @@ -113,6 +124,13 @@ func (p *pieceReader) Seek(offset int64, whence int) (int64, error) { } func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) { + p.mu.Lock() + defer p.mu.Unlock() + + return p.readAtUnlocked(b, off) +} + +func (p *pieceReader) readAtUnlocked(b []byte, off int64) (n int, err error) { if err := p.check(); err != nil { return 0, err }