From 432125699279fe74ac473fd6ef8c55e4b62f1107 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Tue, 5 Nov 2019 08:38:54 -0800 Subject: [PATCH] fix retrieval protocol error by wrapping stream in peeker --- chain/gen/gen.go | 6 ------ lib/sectorbuilder/sectorbuilder.go | 3 +++ lib/sectorbuilder/sectorbuilder_test.go | 4 ++-- retrieval/cbor_gen.go | 15 +++------------ retrieval/client.go | 7 +++++-- retrieval/miner.go | 7 ++++++- 6 files changed, 19 insertions(+), 23 deletions(-) diff --git a/chain/gen/gen.go b/chain/gen/gen.go index e1452bb91..0d951f1fe 100644 --- a/chain/gen/gen.go +++ b/chain/gen/gen.go @@ -279,12 +279,6 @@ func (cg *ChainGen) NextTipSetFromMiners(base *types.TipSet, miners []address.Ad } fts := store.NewFullTipSet(blks) - fmt.Println("Made a block: ", fts.TipSet().Cids()) - if len(fts.TipSet().Cids()) > 1 { - for _, b := range blks { - fmt.Printf("block %s: %#v\n", b.Cid(), b.Header) - } - } return &MinedTipSet{ TipSet: fts, diff --git a/lib/sectorbuilder/sectorbuilder.go b/lib/sectorbuilder/sectorbuilder.go index a2e4499b7..bf55f6927 100644 --- a/lib/sectorbuilder/sectorbuilder.go +++ b/lib/sectorbuilder/sectorbuilder.go @@ -32,6 +32,8 @@ const CommLen = sectorbuilder.CommitmentBytesLen type SectorBuilder struct { handle unsafe.Pointer + + Miner address.Address } type SectorBuilderConfig struct { @@ -52,6 +54,7 @@ func New(cfg *SectorBuilderConfig) (*SectorBuilder, error) { return &SectorBuilder{ handle: sbp, + Miner: cfg.Miner, }, nil } diff --git a/lib/sectorbuilder/sectorbuilder_test.go b/lib/sectorbuilder/sectorbuilder_test.go index 903373cc0..8122ad9ac 100644 --- a/lib/sectorbuilder/sectorbuilder_test.go +++ b/lib/sectorbuilder/sectorbuilder_test.go @@ -23,7 +23,7 @@ func TestSealAndVerify(t *testing.T) { t.Fatal(err) } - sb, cleanup, err := TempSectorbuilder(sectorSize) + sb, cleanup, err := sectorbuilder.TempSectorbuilder(sectorSize) if err != nil { t.Fatal(err) } @@ -53,7 +53,7 @@ func TestSealAndVerify(t *testing.T) { ssinfo := <-store.Incoming() - ok, err := sectorbuilder.VerifySeal(sectorSize, ssinfo.CommR[:], ssinfo.CommD[:], addr, ssinfo.Ticket.TicketBytes[:], ssinfo.SectorID, ssinfo.Proof) + ok, err := sectorbuilder.VerifySeal(sectorSize, ssinfo.CommR[:], ssinfo.CommD[:], sb.Miner, ssinfo.Ticket.TicketBytes[:], ssinfo.SectorID, ssinfo.Proof) if err != nil { t.Fatal(err) } diff --git a/retrieval/cbor_gen.go b/retrieval/cbor_gen.go index 93d800697..3feb15dd1 100644 --- a/retrieval/cbor_gen.go +++ b/retrieval/cbor_gen.go @@ -3,7 +3,6 @@ package retrieval import ( "fmt" "io" - "io/ioutil" cbg "github.com/whyrusleeping/cbor-gen" xerrors "golang.org/x/xerrors" @@ -403,14 +402,11 @@ func (t *Block) MarshalCBOR(w io.Writer) error { } // t.t.Data ([]uint8) (slice) - dlen := len(t.Data) - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(dlen))); err != nil { + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.Data)))); err != nil { return err } - if n, err := w.Write(t.Data); err != nil { + if _, err := w.Write(t.Data); err != nil { return err - } else if n != dlen { - return fmt.Errorf("somehow wrote the wrong number of bytes...") } return nil } @@ -423,12 +419,7 @@ func (t *Block) UnmarshalCBOR(r io.Reader) error { return err } if maj != cbg.MajArray { - data, err := ioutil.ReadAll(r) - if err != nil { - panic("piss") - } - fmt.Println("STRING DATA: ", string(data)) - return fmt.Errorf("cbor input should be of type array (got %d)", maj) + return fmt.Errorf("cbor input should be of type array") } if extra != 2 { diff --git a/retrieval/client.go b/retrieval/client.go index bc2fa2db3..23e7daa09 100644 --- a/retrieval/client.go +++ b/retrieval/client.go @@ -10,6 +10,7 @@ import ( "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" + cbg "github.com/whyrusleeping/cbor-gen" "golang.org/x/xerrors" "github.com/filecoin-project/lotus/api" @@ -69,6 +70,7 @@ func (c *Client) Query(ctx context.Context, p discovery.RetrievalPeer, data cid. type clientStream struct { payapi payapi.PaychAPI stream network.Stream + peeker cbg.BytePeeker root cid.Cid size types.BigInt @@ -118,6 +120,7 @@ func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, cst := clientStream{ payapi: c.payapi, stream: s, + peeker: cbg.GetPeeker(s), root: root, size: types.NewInt(size), @@ -174,7 +177,7 @@ func (cst *clientStream) doOneExchange(ctx context.Context, toFetch uint64, out } var resp DealResponse - if err := cborrpc.ReadCborRPC(cst.stream, &resp); err != nil { + if err := cborrpc.ReadCborRPC(cst.peeker, &resp); err != nil { log.Error(err) return err } @@ -206,7 +209,7 @@ func (cst *clientStream) fetchBlocks(toFetch uint64, out io.Writer) error { log.Infof("block %d of %d", i+1, blocksToFetch) var block Block - if err := cborrpc.ReadCborRPC(cst.stream, &block); err != nil { + if err := cborrpc.ReadCborRPC(cst.peeker, &block); err != nil { return xerrors.Errorf("reading fetchBlock response: %w", err) } diff --git a/retrieval/miner.go b/retrieval/miner.go index 39bb6f417..274963a09 100644 --- a/retrieval/miner.go +++ b/retrieval/miner.go @@ -14,14 +14,19 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/lib/cborrpc" "github.com/filecoin-project/lotus/storage/sectorblocks" ) +type RetrMinerApi interface { + PaychVoucherAdd(context.Context, address.Address, *types.SignedVoucher, []byte, types.BigInt) (types.BigInt, error) +} + type Miner struct { sectorBlocks *sectorblocks.SectorBlocks - full api.FullNode + full RetrMinerApi pricePerByte types.BigInt // TODO: Unseal price