fix retrieval protocol error by wrapping stream in peeker

This commit is contained in:
whyrusleeping 2019-11-05 08:38:54 -08:00
parent 9a398c6260
commit 4321256992
6 changed files with 19 additions and 23 deletions

View File

@ -279,12 +279,6 @@ func (cg *ChainGen) NextTipSetFromMiners(base *types.TipSet, miners []address.Ad
} }
fts := store.NewFullTipSet(blks) fts := store.NewFullTipSet(blks)
fmt.Println("Made a block: ", fts.TipSet().Cids())
if len(fts.TipSet().Cids()) > 1 {
for _, b := range blks {
fmt.Printf("block %s: %#v\n", b.Cid(), b.Header)
}
}
return &MinedTipSet{ return &MinedTipSet{
TipSet: fts, TipSet: fts,

View File

@ -32,6 +32,8 @@ const CommLen = sectorbuilder.CommitmentBytesLen
type SectorBuilder struct { type SectorBuilder struct {
handle unsafe.Pointer handle unsafe.Pointer
Miner address.Address
} }
type SectorBuilderConfig struct { type SectorBuilderConfig struct {
@ -52,6 +54,7 @@ func New(cfg *SectorBuilderConfig) (*SectorBuilder, error) {
return &SectorBuilder{ return &SectorBuilder{
handle: sbp, handle: sbp,
Miner: cfg.Miner,
}, nil }, nil
} }

View File

@ -23,7 +23,7 @@ func TestSealAndVerify(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
sb, cleanup, err := TempSectorbuilder(sectorSize) sb, cleanup, err := sectorbuilder.TempSectorbuilder(sectorSize)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -53,7 +53,7 @@ func TestSealAndVerify(t *testing.T) {
ssinfo := <-store.Incoming() ssinfo := <-store.Incoming()
ok, err := sectorbuilder.VerifySeal(sectorSize, ssinfo.CommR[:], ssinfo.CommD[:], addr, ssinfo.Ticket.TicketBytes[:], ssinfo.SectorID, ssinfo.Proof) ok, err := sectorbuilder.VerifySeal(sectorSize, ssinfo.CommR[:], ssinfo.CommD[:], sb.Miner, ssinfo.Ticket.TicketBytes[:], ssinfo.SectorID, ssinfo.Proof)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -3,7 +3,6 @@ package retrieval
import ( import (
"fmt" "fmt"
"io" "io"
"io/ioutil"
cbg "github.com/whyrusleeping/cbor-gen" cbg "github.com/whyrusleeping/cbor-gen"
xerrors "golang.org/x/xerrors" xerrors "golang.org/x/xerrors"
@ -403,14 +402,11 @@ func (t *Block) MarshalCBOR(w io.Writer) error {
} }
// t.t.Data ([]uint8) (slice) // t.t.Data ([]uint8) (slice)
dlen := len(t.Data) if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.Data)))); err != nil {
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(dlen))); err != nil {
return err return err
} }
if n, err := w.Write(t.Data); err != nil { if _, err := w.Write(t.Data); err != nil {
return err return err
} else if n != dlen {
return fmt.Errorf("somehow wrote the wrong number of bytes...")
} }
return nil return nil
} }
@ -423,12 +419,7 @@ func (t *Block) UnmarshalCBOR(r io.Reader) error {
return err return err
} }
if maj != cbg.MajArray { if maj != cbg.MajArray {
data, err := ioutil.ReadAll(r) return fmt.Errorf("cbor input should be of type array")
if err != nil {
panic("piss")
}
fmt.Println("STRING DATA: ", string(data))
return fmt.Errorf("cbor input should be of type array (got %d)", maj)
} }
if extra != 2 { if extra != 2 {

View File

@ -10,6 +10,7 @@ import (
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
@ -69,6 +70,7 @@ func (c *Client) Query(ctx context.Context, p discovery.RetrievalPeer, data cid.
type clientStream struct { type clientStream struct {
payapi payapi.PaychAPI payapi payapi.PaychAPI
stream network.Stream stream network.Stream
peeker cbg.BytePeeker
root cid.Cid root cid.Cid
size types.BigInt size types.BigInt
@ -118,6 +120,7 @@ func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64,
cst := clientStream{ cst := clientStream{
payapi: c.payapi, payapi: c.payapi,
stream: s, stream: s,
peeker: cbg.GetPeeker(s),
root: root, root: root,
size: types.NewInt(size), size: types.NewInt(size),
@ -174,7 +177,7 @@ func (cst *clientStream) doOneExchange(ctx context.Context, toFetch uint64, out
} }
var resp DealResponse var resp DealResponse
if err := cborrpc.ReadCborRPC(cst.stream, &resp); err != nil { if err := cborrpc.ReadCborRPC(cst.peeker, &resp); err != nil {
log.Error(err) log.Error(err)
return err return err
} }
@ -206,7 +209,7 @@ func (cst *clientStream) fetchBlocks(toFetch uint64, out io.Writer) error {
log.Infof("block %d of %d", i+1, blocksToFetch) log.Infof("block %d of %d", i+1, blocksToFetch)
var block Block var block Block
if err := cborrpc.ReadCborRPC(cst.stream, &block); err != nil { if err := cborrpc.ReadCborRPC(cst.peeker, &block); err != nil {
return xerrors.Errorf("reading fetchBlock response: %w", err) return xerrors.Errorf("reading fetchBlock response: %w", err)
} }

View File

@ -14,14 +14,19 @@ import (
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/cborrpc" "github.com/filecoin-project/lotus/lib/cborrpc"
"github.com/filecoin-project/lotus/storage/sectorblocks" "github.com/filecoin-project/lotus/storage/sectorblocks"
) )
type RetrMinerApi interface {
PaychVoucherAdd(context.Context, address.Address, *types.SignedVoucher, []byte, types.BigInt) (types.BigInt, error)
}
type Miner struct { type Miner struct {
sectorBlocks *sectorblocks.SectorBlocks sectorBlocks *sectorblocks.SectorBlocks
full api.FullNode full RetrMinerApi
pricePerByte types.BigInt pricePerByte types.BigInt
// TODO: Unseal price // TODO: Unseal price