lotus/retrieval/miner.go

246 lines
5.9 KiB
Go
Raw Normal View History

2019-08-26 13:45:36 +00:00
package retrieval
import (
2019-08-27 18:45:21 +00:00
"context"
2019-08-28 23:01:28 +00:00
"io"
2019-08-26 13:45:36 +00:00
2019-08-27 18:45:21 +00:00
"github.com/ipfs/go-blockservice"
2019-08-29 11:31:25 +00:00
"github.com/ipfs/go-cid"
2019-08-27 18:45:21 +00:00
"github.com/ipfs/go-merkledag"
unixfile "github.com/ipfs/go-unixfs/file"
"github.com/libp2p/go-libp2p-core/network"
2019-08-29 11:31:25 +00:00
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/cborrpc"
"github.com/filecoin-project/lotus/storage/sectorblocks"
2019-08-26 13:45:36 +00:00
)
type RetrMinerApi interface {
PaychVoucherAdd(context.Context, address.Address, *types.SignedVoucher, []byte, types.BigInt) (types.BigInt, error)
}
2019-08-26 13:45:36 +00:00
type Miner struct {
sectorBlocks *sectorblocks.SectorBlocks
full RetrMinerApi
2019-08-27 18:45:21 +00:00
pricePerByte types.BigInt
// TODO: Unseal price
2019-08-26 13:45:36 +00:00
}
2019-09-16 21:25:23 +00:00
func NewMiner(sblks *sectorblocks.SectorBlocks, full api.FullNode) *Miner {
2019-08-26 13:45:36 +00:00
return &Miner{
sectorBlocks: sblks,
2019-09-16 21:25:23 +00:00
full: full,
2019-08-27 18:45:21 +00:00
pricePerByte: types.NewInt(2), // TODO: allow setting
2019-08-26 13:45:36 +00:00
}
}
func writeErr(stream network.Stream, err error) {
log.Errorf("Retrieval deal error: %s", err)
_ = cborrpc.WriteCborRPC(stream, &DealResponse{
Status: Error,
Message: err.Error(),
})
}
2019-08-27 18:45:21 +00:00
func (m *Miner) HandleQueryStream(stream network.Stream) {
2019-08-26 13:45:36 +00:00
defer stream.Close()
2019-08-26 18:23:11 +00:00
var query Query
2019-08-26 13:45:36 +00:00
if err := cborrpc.ReadCborRPC(stream, &query); err != nil {
writeErr(stream, err)
2019-08-26 13:45:36 +00:00
return
}
size, err := m.sectorBlocks.GetSize(query.Piece)
if err != nil && err != sectorblocks.ErrNotFound {
2019-08-26 13:45:36 +00:00
log.Errorf("Retrieval query: GetRefs: %s", err)
return
}
answer := &QueryResponse{
2019-08-26 13:45:36 +00:00
Status: Unavailable,
}
if err == nil {
2019-08-26 13:45:36 +00:00
answer.Status = Available
// TODO: get price, look for already unsealed ref to reduce work
answer.MinPrice = types.BigMul(types.NewInt(uint64(size)), m.pricePerByte)
answer.Size = uint64(size) // TODO: verify on intermediate
2019-08-26 13:45:36 +00:00
}
if err := cborrpc.WriteCborRPC(stream, answer); err != nil {
log.Errorf("Retrieval query: WriteCborRPC: %s", err)
return
}
}
2019-08-27 18:45:21 +00:00
type handlerDeal struct {
2019-08-28 23:01:38 +00:00
m *Miner
stream network.Stream
ufsr sectorblocks.UnixfsReader
open cid.Cid
2019-08-28 23:01:38 +00:00
at uint64
size uint64
}
2019-08-29 15:09:34 +00:00
func (m *Miner) HandleDealStream(stream network.Stream) {
2019-08-27 18:45:21 +00:00
defer stream.Close()
hnd := &handlerDeal{
m: m,
stream: stream,
}
2019-08-27 18:45:21 +00:00
2019-08-28 23:01:28 +00:00
var err error
more := true
for more {
more, err = hnd.handleNext() // TODO: 'more' bool
if err != nil {
writeErr(stream, err)
2019-08-27 19:54:39 +00:00
return
2019-08-27 18:45:21 +00:00
}
}
2019-08-27 19:54:39 +00:00
}
2019-08-28 23:01:28 +00:00
func (hnd *handlerDeal) handleNext() (bool, error) {
var deal DealProposal
if err := cborrpc.ReadCborRPC(hnd.stream, &deal); err != nil {
2019-08-28 23:01:28 +00:00
if err == io.EOF { // client sent all deals
err = nil
}
return false, err
}
if deal.Params.Unixfs0 == nil {
2019-08-28 23:01:28 +00:00
return false, xerrors.New("unknown deal type")
}
unixfs0 := deal.Params.Unixfs0
2019-09-24 21:13:47 +00:00
if len(deal.Payment.Vouchers) != 1 {
return false, xerrors.Errorf("expected one signed voucher, got %d", len(deal.Payment.Vouchers))
}
2019-09-16 21:25:23 +00:00
expPayment := types.BigMul(hnd.m.pricePerByte, types.NewInt(deal.Params.Unixfs0.Size))
2019-09-24 21:13:47 +00:00
if _, err := hnd.m.full.PaychVoucherAdd(context.TODO(), deal.Payment.Channel, deal.Payment.Vouchers[0], nil, expPayment); err != nil {
2019-09-16 21:25:23 +00:00
return false, xerrors.Errorf("processing retrieval payment: %w", err)
}
2019-08-27 18:45:21 +00:00
// If the file isn't open (new deal stream), isn't the right file, or isn't
// at the right offset, (re)open it
if hnd.open != deal.Ref || hnd.at != unixfs0.Offset {
log.Infof("opening file for sending (open '%s') (@%d, want %d)", deal.Ref, hnd.at, unixfs0.Offset)
if err := hnd.openFile(deal); err != nil {
2019-08-28 23:01:28 +00:00
return false, err
2019-08-27 18:45:21 +00:00
}
}
2019-08-27 18:45:21 +00:00
if unixfs0.Offset+unixfs0.Size > hnd.size {
return false, xerrors.Errorf("tried to read too much %d+%d > %d", unixfs0.Offset, unixfs0.Size, hnd.size)
}
2019-08-28 23:01:28 +00:00
err := hnd.accept(deal)
if err != nil {
return false, err
}
return true, nil
}
func (hnd *handlerDeal) openFile(deal DealProposal) error {
unixfs0 := deal.Params.Unixfs0
if unixfs0.Offset != 0 {
// TODO: Implement SeekBlock (like ReadBlock) in go-unixfs
return xerrors.New("sending merkle proofs for nonzero offset not supported yet")
}
hnd.at = unixfs0.Offset
bstore := hnd.m.sectorBlocks.SealedBlockstore(func() error {
return nil // TODO: approve unsealing based on amount paid
})
ds := merkledag.NewDAGService(blockservice.New(bstore, nil))
rootNd, err := ds.Get(context.TODO(), deal.Ref)
if err != nil {
return err
}
fsr, err := unixfile.NewUnixfsFile(context.TODO(), ds, rootNd)
if err != nil {
return err
}
var ok bool
hnd.ufsr, ok = fsr.(sectorblocks.UnixfsReader)
if !ok {
return xerrors.Errorf("file %s didn't implement sectorblocks.UnixfsReader", deal.Ref)
}
isize, err := hnd.ufsr.Size()
if err != nil {
return err
}
hnd.size = uint64(isize)
hnd.open = deal.Ref
return nil
}
func (hnd *handlerDeal) accept(deal DealProposal) error {
unixfs0 := deal.Params.Unixfs0
resp := &DealResponse{
Status: Accepted,
}
if err := cborrpc.WriteCborRPC(hnd.stream, resp); err != nil {
log.Errorf("Retrieval query: Write Accepted resp: %s", err)
return err
}
blocksToSend := (unixfs0.Size + build.UnixfsChunkSize - 1) / build.UnixfsChunkSize
for i := uint64(0); i < blocksToSend; {
data, offset, nd, err := hnd.ufsr.ReadBlock(context.TODO())
if err != nil {
return err
2019-08-27 18:45:21 +00:00
}
log.Infof("sending block for a deal: %s", nd.Cid())
if offset != unixfs0.Offset {
return xerrors.Errorf("ReadBlock on wrong offset: want %d, got %d", unixfs0.Offset, offset)
2019-08-27 18:45:21 +00:00
}
/*if uint64(len(data)) != deal.Unixfs0.Size { // TODO: Fix for internal nodes (and any other node too)
writeErr(stream, xerrors.Errorf("ReadBlock data with wrong size: want %d, got %d", deal.Unixfs0.Size, len(data)))
2019-08-27 18:45:21 +00:00
return
}*/
block := &Block{
Prefix: nd.Cid().Prefix().Bytes(),
Data: nd.RawData(),
2019-08-27 18:45:21 +00:00
}
2019-08-29 11:31:25 +00:00
if err := cborrpc.WriteCborRPC(hnd.stream, block); err != nil {
return err
}
2019-08-27 18:45:21 +00:00
if len(data) > 0 { // don't count internal nodes
hnd.at += uint64(len(data))
i++
}
2019-08-27 18:45:21 +00:00
}
return nil
2019-08-27 18:45:21 +00:00
}