retrieval: make handler work with chunked files
This commit is contained in:
parent
a6914309a6
commit
483f06b329
@ -71,127 +71,151 @@ func (m *Miner) HandleQueryStream(stream network.Stream) {
|
||||
}
|
||||
}
|
||||
|
||||
type handlerDeal struct {
|
||||
m *Miner
|
||||
stream network.Stream
|
||||
|
||||
ufsr sectorblocks.UnixfsReader
|
||||
open cid.Cid
|
||||
at uint64
|
||||
size uint64
|
||||
}
|
||||
|
||||
func (m *Miner) HandleDealStream(stream network.Stream) { // TODO: should we block in stream handlers
|
||||
defer stream.Close()
|
||||
|
||||
var ufsr sectorblocks.UnixfsReader
|
||||
var open cid.Cid
|
||||
var at uint64
|
||||
var size uint64
|
||||
hnd := &handlerDeal{
|
||||
m: m,
|
||||
|
||||
stream: stream,
|
||||
}
|
||||
|
||||
for {
|
||||
var deal Deal
|
||||
if err := cborrpc.ReadCborRPC(stream, &deal); err != nil {
|
||||
err := hnd.handleNext() // TODO: 'more' bool
|
||||
if err != nil {
|
||||
writeErr(stream, err)
|
||||
return
|
||||
}
|
||||
|
||||
if deal.Unixfs0 == nil {
|
||||
writeErr(stream, xerrors.New("unknown deal type"))
|
||||
return
|
||||
}
|
||||
|
||||
// TODO: Verify payment, check how much we can send based on that
|
||||
// Or reject (possibly returning the payment to retain reputation with the client)
|
||||
|
||||
bstore := m.sectorBlocks.SealedBlockstore(func() error {
|
||||
return nil // TODO: approve unsealing based on amount paid
|
||||
})
|
||||
|
||||
if open != deal.Unixfs0.Root || at != deal.Unixfs0.Offset {
|
||||
if deal.Unixfs0.Offset != 0 {
|
||||
// TODO: Implement SeekBlock (like ReadBlock) in go-unixfs
|
||||
writeErr(stream, xerrors.New("sending merkle proofs for nonzero offset not supported yet"))
|
||||
return
|
||||
}
|
||||
at = deal.Unixfs0.Offset
|
||||
|
||||
ds := merkledag.NewDAGService(blockservice.New(bstore, nil))
|
||||
rootNd, err := ds.Get(context.TODO(), deal.Unixfs0.Root)
|
||||
if err != nil {
|
||||
writeErr(stream, err)
|
||||
return
|
||||
}
|
||||
|
||||
fsr, err := unixfile.NewUnixfsFile(context.TODO(), ds, rootNd)
|
||||
if err != nil {
|
||||
writeErr(stream, err)
|
||||
return
|
||||
}
|
||||
|
||||
var ok bool
|
||||
ufsr, ok = fsr.(sectorblocks.UnixfsReader)
|
||||
if !ok {
|
||||
writeErr(stream, xerrors.Errorf("file %s didn't implement sectorblocks.UnixfsReader", deal.Unixfs0.Root))
|
||||
return
|
||||
}
|
||||
|
||||
isize, err := ufsr.Size()
|
||||
if err != nil {
|
||||
writeErr(stream, err)
|
||||
return
|
||||
}
|
||||
size = uint64(isize)
|
||||
}
|
||||
|
||||
if deal.Unixfs0.Offset+deal.Unixfs0.Size > size {
|
||||
writeErr(stream, xerrors.Errorf("tried to read too much %d+%d > %d", deal.Unixfs0.Offset, deal.Unixfs0.Size, size))
|
||||
return
|
||||
}
|
||||
|
||||
resp := DealResponse{
|
||||
Status: Accepted,
|
||||
}
|
||||
if err := cborrpc.WriteCborRPC(stream, resp); err != nil {
|
||||
log.Errorf("Retrieval query: Write Accepted resp: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
buf := make([]byte, network.MessageSizeMax)
|
||||
msgw := msgio.NewVarintWriter(stream)
|
||||
|
||||
blocksToSend := (deal.Unixfs0.Size + build.UnixfsChunkSize - 1) / build.UnixfsChunkSize
|
||||
for i := uint64(0); i < blocksToSend; {
|
||||
data, offset, nd, err := ufsr.ReadBlock(context.TODO())
|
||||
if err != nil {
|
||||
writeErr(stream, err)
|
||||
return
|
||||
}
|
||||
|
||||
log.Infof("sending block for a deal: %s", nd.Cid())
|
||||
|
||||
if offset != deal.Unixfs0.Offset {
|
||||
writeErr(stream, xerrors.Errorf("ReadBlock on wrong offset: want %d, got %d", deal.Unixfs0.Offset, offset))
|
||||
return
|
||||
}
|
||||
|
||||
/*if uint64(len(data)) != deal.Unixfs0.Size { // TODO: Fix for internal nodes (and any other node too)
|
||||
writeErr(stream, xerrors.Errorf("ReadBlock data with wrong size: want %d, got %d", deal.Unixfs0.Size, len(data)))
|
||||
return
|
||||
}*/
|
||||
|
||||
block := pb.Message_Block{
|
||||
Prefix: nd.Cid().Prefix().Bytes(),
|
||||
Data: nd.RawData(),
|
||||
}
|
||||
|
||||
n, err := block.MarshalTo(buf)
|
||||
if err != nil {
|
||||
writeErr(stream, err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := msgw.WriteMsg(buf[:n]); err != nil {
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
if len(data) > 0 { // don't count internal nodes
|
||||
i++
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: set `at`
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (hnd *handlerDeal) handleNext() error {
|
||||
var deal Deal
|
||||
if err := cborrpc.ReadCborRPC(hnd.stream, &deal); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if deal.Unixfs0 == nil {
|
||||
return xerrors.New("unknown deal type")
|
||||
}
|
||||
|
||||
// TODO: Verify payment, check how much we can send based on that
|
||||
// Or reject (possibly returning the payment to retain reputation with the client)
|
||||
|
||||
if hnd.open != deal.Unixfs0.Root || hnd.at != deal.Unixfs0.Offset {
|
||||
log.Infof("opening file for sending (open '%s') (@%d, want %d)", hnd.open, hnd.at, deal.Unixfs0.Offset)
|
||||
if err := hnd.openFile(deal); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if deal.Unixfs0.Offset+deal.Unixfs0.Size > hnd.size {
|
||||
return xerrors.Errorf("tried to read too much %d+%d > %d", deal.Unixfs0.Offset, deal.Unixfs0.Size, hnd.size)
|
||||
}
|
||||
|
||||
return hnd.accept(deal)
|
||||
}
|
||||
|
||||
func (hnd *handlerDeal) openFile(deal Deal) error {
|
||||
if deal.Unixfs0.Offset != 0 {
|
||||
// TODO: Implement SeekBlock (like ReadBlock) in go-unixfs
|
||||
return xerrors.New("sending merkle proofs for nonzero offset not supported yet")
|
||||
}
|
||||
hnd.at = deal.Unixfs0.Offset
|
||||
|
||||
bstore := hnd.m.sectorBlocks.SealedBlockstore(func() error {
|
||||
return nil // TODO: approve unsealing based on amount paid
|
||||
})
|
||||
|
||||
ds := merkledag.NewDAGService(blockservice.New(bstore, nil))
|
||||
rootNd, err := ds.Get(context.TODO(), deal.Unixfs0.Root)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fsr, err := unixfile.NewUnixfsFile(context.TODO(), ds, rootNd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var ok bool
|
||||
hnd.ufsr, ok = fsr.(sectorblocks.UnixfsReader)
|
||||
if !ok {
|
||||
return xerrors.Errorf("file %s didn't implement sectorblocks.UnixfsReader", deal.Unixfs0.Root)
|
||||
}
|
||||
|
||||
isize, err := hnd.ufsr.Size()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
hnd.size = uint64(isize)
|
||||
|
||||
hnd.open = deal.Unixfs0.Root
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (hnd *handlerDeal) accept(deal Deal) error {
|
||||
resp := DealResponse{
|
||||
Status: Accepted,
|
||||
}
|
||||
if err := cborrpc.WriteCborRPC(hnd.stream, resp); err != nil {
|
||||
log.Errorf("Retrieval query: Write Accepted resp: %s", err)
|
||||
return err
|
||||
}
|
||||
|
||||
buf := make([]byte, network.MessageSizeMax)
|
||||
msgw := msgio.NewVarintWriter(hnd.stream)
|
||||
|
||||
blocksToSend := (deal.Unixfs0.Size + build.UnixfsChunkSize - 1) / build.UnixfsChunkSize
|
||||
for i := uint64(0); i < blocksToSend; {
|
||||
data, offset, nd, err := hnd.ufsr.ReadBlock(context.TODO())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infof("sending block for a deal: %s", nd.Cid())
|
||||
|
||||
if offset != deal.Unixfs0.Offset {
|
||||
return xerrors.Errorf("ReadBlock on wrong offset: want %d, got %d", deal.Unixfs0.Offset, offset)
|
||||
}
|
||||
|
||||
/*if uint64(len(data)) != deal.Unixfs0.Size { // TODO: Fix for internal nodes (and any other node too)
|
||||
writeErr(stream, xerrors.Errorf("ReadBlock data with wrong size: want %d, got %d", deal.Unixfs0.Size, len(data)))
|
||||
return
|
||||
}*/
|
||||
|
||||
block := pb.Message_Block{
|
||||
Prefix: nd.Cid().Prefix().Bytes(),
|
||||
Data: nd.RawData(),
|
||||
}
|
||||
|
||||
n, err := block.MarshalTo(buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := msgw.WriteMsg(buf[:n]); err != nil {
|
||||
log.Error(err)
|
||||
return err
|
||||
}
|
||||
|
||||
if len(data) > 0 { // don't count internal nodes
|
||||
hnd.at += uint64(len(data))
|
||||
i++
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user