diff --git a/retrieval/miner.go b/retrieval/miner.go index c8d8a2ccf..1555836ef 100644 --- a/retrieval/miner.go +++ b/retrieval/miner.go @@ -31,17 +31,25 @@ func NewMiner(sblks *sectorblocks.SectorBlocks) *Miner { } } +func writeErr(stream network.Stream, err error) { + log.Errorf("Retrieval deal error: %s", err) + _ = cborrpc.WriteCborRPC(stream, DealResponse{ + Status: Error, + Message: err.Error(), + }) +} + func (m *Miner) HandleQueryStream(stream network.Stream) { defer stream.Close() var query Query if err := cborrpc.ReadCborRPC(stream, &query); err != nil { - log.Errorf("Retrieval query: ReadCborRPC: %s", err) + writeErr(stream, err) return } - refs, err := m.sectorBlocks.GetRefs(query.Piece) - if err != nil { + size, err := m.sectorBlocks.GetSize(query.Piece) + if err != nil && err != sectorblocks.ErrNotFound { log.Errorf("Retrieval query: GetRefs: %s", err) return } @@ -49,12 +57,12 @@ func (m *Miner) HandleQueryStream(stream network.Stream) { answer := QueryResponse{ Status: Unavailable, } - if len(refs) > 0 { + if err == nil { answer.Status = Available // TODO: get price, look for already unsealed ref to reduce work - answer.MinPrice = types.BigMul(types.NewInt(uint64(refs[0].Size)), m.pricePerByte) - answer.Size = uint64(refs[0].Size) // TODO: verify on intermediate + answer.MinPrice = types.BigMul(types.NewInt(uint64(size)), m.pricePerByte) + answer.Size = uint64(size) // TODO: verify on intermediate } if err := cborrpc.WriteCborRPC(stream, answer); err != nil { @@ -63,14 +71,6 @@ func (m *Miner) HandleQueryStream(stream network.Stream) { } } -func writeErr(stream network.Stream, err error) { - log.Errorf("Retrieval deal error: %s", err) - _ = cborrpc.WriteCborRPC(stream, DealResponse{ - Status: Error, - Message: err.Error(), - }) -} - func (m *Miner) HandleDealStream(stream network.Stream) { // TODO: should we block in stream handlers defer stream.Close() diff --git a/storage/sectorblocks/blocks.go b/storage/sectorblocks/blocks.go index cebb8132c..90d691660 100644 --- a/storage/sectorblocks/blocks.go +++ b/storage/sectorblocks/blocks.go @@ -2,6 +2,7 @@ package sectorblocks import ( "context" + "errors" "github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/lib/sectorbuilder" "github.com/filecoin-project/go-lotus/node/modules/dtypes" @@ -9,6 +10,7 @@ import ( "github.com/ipfs/go-datastore/query" blockstore "github.com/ipfs/go-ipfs-blockstore" ipld "github.com/ipfs/go-ipld-format" + "github.com/ipfs/go-unixfs" "sync" "github.com/ipfs/go-cid" @@ -29,6 +31,8 @@ const ( var dsPrefix = datastore.NewKey("/sealedblocks") var imBlocksPrefix = datastore.NewKey("/intermediate") +var ErrNotFound = errors.New("not found") + type SectorBlocks struct { *sector.Store @@ -69,14 +73,14 @@ type UnixfsReader interface { type refStorer struct { blockReader UnixfsReader - writeRef func(cid cid.Cid, offset uint64, size uint32) error + writeRef func(cid cid.Cid, pieceRef string, offset uint64, size uint32) error intermediate blockstore.Blockstore pieceRef string remaining []byte } -func (st *SectorBlocks) writeRef(cid cid.Cid, offset uint64, size uint32) error { +func (st *SectorBlocks) writeRef(cid cid.Cid, pieceRef string, offset uint64, size uint32) error { st.keyLk.Lock() // TODO: make this multithreaded defer st.keyLk.Unlock() @@ -96,7 +100,7 @@ func (st *SectorBlocks) writeRef(cid cid.Cid, offset uint64, size uint32) error } refs = append(refs, api.SealedRef{ - Piece: string(SerializationUnixfs0) + cid.String(), + Piece: pieceRef, Offset: offset, Size: size, }) @@ -136,7 +140,7 @@ func (r *refStorer) Read(p []byte) (n int, err error) { continue } - if err := r.writeRef(nd.Cid(), offset, uint32(len(data))); err != nil { + if err := r.writeRef(nd.Cid(), r.pieceRef, offset, uint32(len(data))); err != nil { return 0, err } @@ -196,6 +200,9 @@ func (st *SectorBlocks) List() (map[cid.Cid][]api.SealedRef, error) { func (st *SectorBlocks) GetRefs(k cid.Cid) ([]api.SealedRef, error) { // TODO: track local sectors ent, err := st.keys.Get(dshelp.CidToDsKey(k)) + if err == datastore.ErrNotFound { + err = ErrNotFound + } if err != nil { return nil, err } @@ -208,6 +215,33 @@ func (st *SectorBlocks) GetRefs(k cid.Cid) ([]api.SealedRef, error) { // TODO: t return refs, nil } +func (st *SectorBlocks) GetSize(k cid.Cid) (uint64, error) { + blk, err := st.intermediate.Get(k) + if err == blockstore.ErrNotFound { + refs, err := st.GetRefs(k) + if err != nil { + return 0, err + } + + return uint64(refs[0].Size), nil + } + if err != nil { + return 0, err + } + + nd, err := ipld.Decode(blk) + if err != nil { + return 0, err + } + + fsn, err := unixfs.ExtractFSNode(nd) + if err != nil { + return 0, err + } + + return fsn.FileSize(), nil +} + func (st *SectorBlocks) Has(k cid.Cid) (bool, error) { // TODO: ensure sector is still there return st.keys.Has(dshelp.CidToDsKey(k)) diff --git a/storage/sectorblocks/unsealed.go b/storage/sectorblocks/unsealed.go index d211910d3..d532b82ab 100644 --- a/storage/sectorblocks/unsealed.go +++ b/storage/sectorblocks/unsealed.go @@ -85,11 +85,14 @@ func (ub *unsealedBlocks) maybeUnseal(ctx context.Context, pieceKey string, appr log.Infof("Unsealing piece '%s'", pieceKey) data, err := ub.sb.ReadPieceFromSealedSector(pieceKey) + ub.lk.Lock() + if err != nil { + // TODO: tell subs + log.Error(err) return nil, err } - ub.lk.Lock() ub.unsealed[pieceKey] = data close(ub.unsealing[pieceKey]) return data, nil