diff --git a/retrieval/miner.go b/retrieval/miner.go index 1555836ef..ec04c0dc8 100644 --- a/retrieval/miner.go +++ b/retrieval/miner.go @@ -71,127 +71,151 @@ func (m *Miner) HandleQueryStream(stream network.Stream) { } } +type handlerDeal struct { + m *Miner + stream network.Stream + + ufsr sectorblocks.UnixfsReader + open cid.Cid + at uint64 + size uint64 +} + func (m *Miner) HandleDealStream(stream network.Stream) { // TODO: should we block in stream handlers defer stream.Close() - var ufsr sectorblocks.UnixfsReader - var open cid.Cid - var at uint64 - var size uint64 + hnd := &handlerDeal{ + m: m, + + stream: stream, + } for { - var deal Deal - if err := cborrpc.ReadCborRPC(stream, &deal); err != nil { + err := hnd.handleNext() // TODO: 'more' bool + if err != nil { + writeErr(stream, err) return } - - if deal.Unixfs0 == nil { - writeErr(stream, xerrors.New("unknown deal type")) - return - } - - // TODO: Verify payment, check how much we can send based on that - // Or reject (possibly returning the payment to retain reputation with the client) - - bstore := m.sectorBlocks.SealedBlockstore(func() error { - return nil // TODO: approve unsealing based on amount paid - }) - - if open != deal.Unixfs0.Root || at != deal.Unixfs0.Offset { - if deal.Unixfs0.Offset != 0 { - // TODO: Implement SeekBlock (like ReadBlock) in go-unixfs - writeErr(stream, xerrors.New("sending merkle proofs for nonzero offset not supported yet")) - return - } - at = deal.Unixfs0.Offset - - ds := merkledag.NewDAGService(blockservice.New(bstore, nil)) - rootNd, err := ds.Get(context.TODO(), deal.Unixfs0.Root) - if err != nil { - writeErr(stream, err) - return - } - - fsr, err := unixfile.NewUnixfsFile(context.TODO(), ds, rootNd) - if err != nil { - writeErr(stream, err) - return - } - - var ok bool - ufsr, ok = fsr.(sectorblocks.UnixfsReader) - if !ok { - writeErr(stream, xerrors.Errorf("file %s didn't implement sectorblocks.UnixfsReader", deal.Unixfs0.Root)) - return - } - - isize, err := ufsr.Size() - if err != nil { - writeErr(stream, err) - return - } - size = uint64(isize) - } - - if deal.Unixfs0.Offset+deal.Unixfs0.Size > size { - writeErr(stream, xerrors.Errorf("tried to read too much %d+%d > %d", deal.Unixfs0.Offset, deal.Unixfs0.Size, size)) - return - } - - resp := DealResponse{ - Status: Accepted, - } - if err := cborrpc.WriteCborRPC(stream, resp); err != nil { - log.Errorf("Retrieval query: Write Accepted resp: %s", err) - return - } - - buf := make([]byte, network.MessageSizeMax) - msgw := msgio.NewVarintWriter(stream) - - blocksToSend := (deal.Unixfs0.Size + build.UnixfsChunkSize - 1) / build.UnixfsChunkSize - for i := uint64(0); i < blocksToSend; { - data, offset, nd, err := ufsr.ReadBlock(context.TODO()) - if err != nil { - writeErr(stream, err) - return - } - - log.Infof("sending block for a deal: %s", nd.Cid()) - - if offset != deal.Unixfs0.Offset { - writeErr(stream, xerrors.Errorf("ReadBlock on wrong offset: want %d, got %d", deal.Unixfs0.Offset, offset)) - return - } - - /*if uint64(len(data)) != deal.Unixfs0.Size { // TODO: Fix for internal nodes (and any other node too) - writeErr(stream, xerrors.Errorf("ReadBlock data with wrong size: want %d, got %d", deal.Unixfs0.Size, len(data))) - return - }*/ - - block := pb.Message_Block{ - Prefix: nd.Cid().Prefix().Bytes(), - Data: nd.RawData(), - } - - n, err := block.MarshalTo(buf) - if err != nil { - writeErr(stream, err) - return - } - - if err := msgw.WriteMsg(buf[:n]); err != nil { - log.Error(err) - return - } - - if len(data) > 0 { // don't count internal nodes - i++ - } - } - - // TODO: set `at` - } } + +func (hnd *handlerDeal) handleNext() error { + var deal Deal + if err := cborrpc.ReadCborRPC(hnd.stream, &deal); err != nil { + return err + } + + if deal.Unixfs0 == nil { + return xerrors.New("unknown deal type") + } + + // TODO: Verify payment, check how much we can send based on that + // Or reject (possibly returning the payment to retain reputation with the client) + + if hnd.open != deal.Unixfs0.Root || hnd.at != deal.Unixfs0.Offset { + log.Infof("opening file for sending (open '%s') (@%d, want %d)", hnd.open, hnd.at, deal.Unixfs0.Offset) + if err := hnd.openFile(deal); err != nil { + return err + } + } + + if deal.Unixfs0.Offset+deal.Unixfs0.Size > hnd.size { + return xerrors.Errorf("tried to read too much %d+%d > %d", deal.Unixfs0.Offset, deal.Unixfs0.Size, hnd.size) + } + + return hnd.accept(deal) +} + +func (hnd *handlerDeal) openFile(deal Deal) error { + if deal.Unixfs0.Offset != 0 { + // TODO: Implement SeekBlock (like ReadBlock) in go-unixfs + return xerrors.New("sending merkle proofs for nonzero offset not supported yet") + } + hnd.at = deal.Unixfs0.Offset + + bstore := hnd.m.sectorBlocks.SealedBlockstore(func() error { + return nil // TODO: approve unsealing based on amount paid + }) + + ds := merkledag.NewDAGService(blockservice.New(bstore, nil)) + rootNd, err := ds.Get(context.TODO(), deal.Unixfs0.Root) + if err != nil { + return err + } + + fsr, err := unixfile.NewUnixfsFile(context.TODO(), ds, rootNd) + if err != nil { + return err + } + + var ok bool + hnd.ufsr, ok = fsr.(sectorblocks.UnixfsReader) + if !ok { + return xerrors.Errorf("file %s didn't implement sectorblocks.UnixfsReader", deal.Unixfs0.Root) + } + + isize, err := hnd.ufsr.Size() + if err != nil { + return err + } + hnd.size = uint64(isize) + + hnd.open = deal.Unixfs0.Root + + return nil +} + +func (hnd *handlerDeal) accept(deal Deal) error { + resp := DealResponse{ + Status: Accepted, + } + if err := cborrpc.WriteCborRPC(hnd.stream, resp); err != nil { + log.Errorf("Retrieval query: Write Accepted resp: %s", err) + return err + } + + buf := make([]byte, network.MessageSizeMax) + msgw := msgio.NewVarintWriter(hnd.stream) + + blocksToSend := (deal.Unixfs0.Size + build.UnixfsChunkSize - 1) / build.UnixfsChunkSize + for i := uint64(0); i < blocksToSend; { + data, offset, nd, err := hnd.ufsr.ReadBlock(context.TODO()) + if err != nil { + return err + } + + log.Infof("sending block for a deal: %s", nd.Cid()) + + if offset != deal.Unixfs0.Offset { + return xerrors.Errorf("ReadBlock on wrong offset: want %d, got %d", deal.Unixfs0.Offset, offset) + } + + /*if uint64(len(data)) != deal.Unixfs0.Size { // TODO: Fix for internal nodes (and any other node too) + writeErr(stream, xerrors.Errorf("ReadBlock data with wrong size: want %d, got %d", deal.Unixfs0.Size, len(data))) + return + }*/ + + block := pb.Message_Block{ + Prefix: nd.Cid().Prefix().Bytes(), + Data: nd.RawData(), + } + + n, err := block.MarshalTo(buf) + if err != nil { + return err + } + + if err := msgw.WriteMsg(buf[:n]); err != nil { + log.Error(err) + return err + } + + if len(data) > 0 { // don't count internal nodes + hnd.at += uint64(len(data)) + i++ + } + } + + return nil +}