From d0af0eff9f75d471455a7015f6c00fb4bfb2ec3a Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Tue, 29 Mar 2022 19:41:06 +0530 Subject: [PATCH] thread safe piecereader --- extern/sector-storage/piece_reader.go | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/extern/sector-storage/piece_reader.go b/extern/sector-storage/piece_reader.go index d7a3f4e98..882a2a6d9 100644 --- a/extern/sector-storage/piece_reader.go +++ b/extern/sector-storage/piece_reader.go @@ -4,6 +4,7 @@ import ( "bufio" "context" "io" + "sync" "github.com/ipfs/go-cid" "go.opencensus.io/stats" @@ -31,6 +32,7 @@ type pieceReader struct { closed bool seqAt int64 // next byte to be read by io.Reader + mu sync.Mutex r io.ReadCloser br *bufio.Reader rAt int64 @@ -62,6 +64,9 @@ func (p *pieceReader) check() error { } func (p *pieceReader) Close() error { + p.mu.Lock() + defer p.mu.Unlock() + if err := p.check(); err != nil { return err } @@ -84,16 +89,22 @@ func (p *pieceReader) Close() error { } func (p *pieceReader) Read(b []byte) (int, error) { + p.mu.Lock() + defer p.mu.Unlock() + if err := p.check(); err != nil { return 0, err } - n, err := p.ReadAt(b, p.seqAt) + n, err := p.readAtUnlocked(b, p.seqAt) p.seqAt += int64(n) return n, err } func (p *pieceReader) Seek(offset int64, whence int) (int64, error) { + p.mu.Lock() + defer p.mu.Unlock() + if err := p.check(); err != nil { return 0, err } @@ -113,6 +124,13 @@ func (p *pieceReader) Seek(offset int64, whence int) (int64, error) { } func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) { + p.mu.Lock() + defer p.mu.Unlock() + + return p.readAtUnlocked(b, off) +} + +func (p *pieceReader) readAtUnlocked(b []byte, off int64) (n int, err error) { if err := p.check(); err != nil { return 0, err }