retrieval: Almost working chunked files

This commit is contained in:
Łukasz Magiera 2019-08-28 01:40:14 +02:00
parent a5d3122ba5
commit d7d42416d7
3 changed files with 56 additions and 19 deletions

View File

@ -31,17 +31,25 @@ func NewMiner(sblks *sectorblocks.SectorBlocks) *Miner {
} }
} }
func writeErr(stream network.Stream, err error) {
log.Errorf("Retrieval deal error: %s", err)
_ = cborrpc.WriteCborRPC(stream, DealResponse{
Status: Error,
Message: err.Error(),
})
}
func (m *Miner) HandleQueryStream(stream network.Stream) { func (m *Miner) HandleQueryStream(stream network.Stream) {
defer stream.Close() defer stream.Close()
var query Query var query Query
if err := cborrpc.ReadCborRPC(stream, &query); err != nil { if err := cborrpc.ReadCborRPC(stream, &query); err != nil {
log.Errorf("Retrieval query: ReadCborRPC: %s", err) writeErr(stream, err)
return return
} }
refs, err := m.sectorBlocks.GetRefs(query.Piece) size, err := m.sectorBlocks.GetSize(query.Piece)
if err != nil { if err != nil && err != sectorblocks.ErrNotFound {
log.Errorf("Retrieval query: GetRefs: %s", err) log.Errorf("Retrieval query: GetRefs: %s", err)
return return
} }
@ -49,12 +57,12 @@ func (m *Miner) HandleQueryStream(stream network.Stream) {
answer := QueryResponse{ answer := QueryResponse{
Status: Unavailable, Status: Unavailable,
} }
if len(refs) > 0 { if err == nil {
answer.Status = Available answer.Status = Available
// TODO: get price, look for already unsealed ref to reduce work // TODO: get price, look for already unsealed ref to reduce work
answer.MinPrice = types.BigMul(types.NewInt(uint64(refs[0].Size)), m.pricePerByte) answer.MinPrice = types.BigMul(types.NewInt(uint64(size)), m.pricePerByte)
answer.Size = uint64(refs[0].Size) // TODO: verify on intermediate answer.Size = uint64(size) // TODO: verify on intermediate
} }
if err := cborrpc.WriteCborRPC(stream, answer); err != nil { if err := cborrpc.WriteCborRPC(stream, answer); err != nil {
@ -63,14 +71,6 @@ func (m *Miner) HandleQueryStream(stream network.Stream) {
} }
} }
func writeErr(stream network.Stream, err error) {
log.Errorf("Retrieval deal error: %s", err)
_ = cborrpc.WriteCborRPC(stream, DealResponse{
Status: Error,
Message: err.Error(),
})
}
func (m *Miner) HandleDealStream(stream network.Stream) { // TODO: should we block in stream handlers func (m *Miner) HandleDealStream(stream network.Stream) { // TODO: should we block in stream handlers
defer stream.Close() defer stream.Close()

View File

@ -2,6 +2,7 @@ package sectorblocks
import ( import (
"context" "context"
"errors"
"github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/lib/sectorbuilder" "github.com/filecoin-project/go-lotus/lib/sectorbuilder"
"github.com/filecoin-project/go-lotus/node/modules/dtypes" "github.com/filecoin-project/go-lotus/node/modules/dtypes"
@ -9,6 +10,7 @@ import (
"github.com/ipfs/go-datastore/query" "github.com/ipfs/go-datastore/query"
blockstore "github.com/ipfs/go-ipfs-blockstore" blockstore "github.com/ipfs/go-ipfs-blockstore"
ipld "github.com/ipfs/go-ipld-format" ipld "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-unixfs"
"sync" "sync"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
@ -29,6 +31,8 @@ const (
var dsPrefix = datastore.NewKey("/sealedblocks") var dsPrefix = datastore.NewKey("/sealedblocks")
var imBlocksPrefix = datastore.NewKey("/intermediate") var imBlocksPrefix = datastore.NewKey("/intermediate")
var ErrNotFound = errors.New("not found")
type SectorBlocks struct { type SectorBlocks struct {
*sector.Store *sector.Store
@ -69,14 +73,14 @@ type UnixfsReader interface {
type refStorer struct { type refStorer struct {
blockReader UnixfsReader blockReader UnixfsReader
writeRef func(cid cid.Cid, offset uint64, size uint32) error writeRef func(cid cid.Cid, pieceRef string, offset uint64, size uint32) error
intermediate blockstore.Blockstore intermediate blockstore.Blockstore
pieceRef string pieceRef string
remaining []byte remaining []byte
} }
func (st *SectorBlocks) writeRef(cid cid.Cid, offset uint64, size uint32) error { func (st *SectorBlocks) writeRef(cid cid.Cid, pieceRef string, offset uint64, size uint32) error {
st.keyLk.Lock() // TODO: make this multithreaded st.keyLk.Lock() // TODO: make this multithreaded
defer st.keyLk.Unlock() defer st.keyLk.Unlock()
@ -96,7 +100,7 @@ func (st *SectorBlocks) writeRef(cid cid.Cid, offset uint64, size uint32) error
} }
refs = append(refs, api.SealedRef{ refs = append(refs, api.SealedRef{
Piece: string(SerializationUnixfs0) + cid.String(), Piece: pieceRef,
Offset: offset, Offset: offset,
Size: size, Size: size,
}) })
@ -136,7 +140,7 @@ func (r *refStorer) Read(p []byte) (n int, err error) {
continue continue
} }
if err := r.writeRef(nd.Cid(), offset, uint32(len(data))); err != nil { if err := r.writeRef(nd.Cid(), r.pieceRef, offset, uint32(len(data))); err != nil {
return 0, err return 0, err
} }
@ -196,6 +200,9 @@ func (st *SectorBlocks) List() (map[cid.Cid][]api.SealedRef, error) {
func (st *SectorBlocks) GetRefs(k cid.Cid) ([]api.SealedRef, error) { // TODO: track local sectors func (st *SectorBlocks) GetRefs(k cid.Cid) ([]api.SealedRef, error) { // TODO: track local sectors
ent, err := st.keys.Get(dshelp.CidToDsKey(k)) ent, err := st.keys.Get(dshelp.CidToDsKey(k))
if err == datastore.ErrNotFound {
err = ErrNotFound
}
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -208,6 +215,33 @@ func (st *SectorBlocks) GetRefs(k cid.Cid) ([]api.SealedRef, error) { // TODO: t
return refs, nil return refs, nil
} }
func (st *SectorBlocks) GetSize(k cid.Cid) (uint64, error) {
blk, err := st.intermediate.Get(k)
if err == blockstore.ErrNotFound {
refs, err := st.GetRefs(k)
if err != nil {
return 0, err
}
return uint64(refs[0].Size), nil
}
if err != nil {
return 0, err
}
nd, err := ipld.Decode(blk)
if err != nil {
return 0, err
}
fsn, err := unixfs.ExtractFSNode(nd)
if err != nil {
return 0, err
}
return fsn.FileSize(), nil
}
func (st *SectorBlocks) Has(k cid.Cid) (bool, error) { func (st *SectorBlocks) Has(k cid.Cid) (bool, error) {
// TODO: ensure sector is still there // TODO: ensure sector is still there
return st.keys.Has(dshelp.CidToDsKey(k)) return st.keys.Has(dshelp.CidToDsKey(k))

View File

@ -85,11 +85,14 @@ func (ub *unsealedBlocks) maybeUnseal(ctx context.Context, pieceKey string, appr
log.Infof("Unsealing piece '%s'", pieceKey) log.Infof("Unsealing piece '%s'", pieceKey)
data, err := ub.sb.ReadPieceFromSealedSector(pieceKey) data, err := ub.sb.ReadPieceFromSealedSector(pieceKey)
ub.lk.Lock()
if err != nil { if err != nil {
// TODO: tell subs
log.Error(err)
return nil, err return nil, err
} }
ub.lk.Lock()
ub.unsealed[pieceKey] = data ub.unsealed[pieceKey] = data
close(ub.unsealing[pieceKey]) close(ub.unsealing[pieceKey])
return data, nil return data, nil