From 40b1f918433fcf6efd5bc801cca0118112124952 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 6 Nov 2019 13:22:08 +0100 Subject: [PATCH] fix sector block ref serialization --- api/api_storage.go | 6 +- api/cbor_gen.go | 142 +++++++++++++++++++++++++++++++++ gen/main.go | 2 + storage/sectorblocks/blocks.go | 47 +++++------ 4 files changed, 173 insertions(+), 24 deletions(-) diff --git a/api/api_storage.go b/api/api_storage.go index 7a392455f..75fc55849 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -69,5 +69,9 @@ type StorageMiner interface { type SealedRef struct { Piece string Offset uint64 - Size uint32 + Size uint64 +} + +type SealedRefs struct { + Refs []SealedRef } diff --git a/api/cbor_gen.go b/api/cbor_gen.go index 6f71bbaed..0f968865b 100644 --- a/api/cbor_gen.go +++ b/api/cbor_gen.go @@ -127,3 +127,145 @@ func (t *PaymentInfo) UnmarshalCBOR(r io.Reader) error { return nil } + +func (t *SealedRef) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{131}); err != nil { + return err + } + + // t.t.Piece (string) (string) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.Piece)))); err != nil { + return err + } + if _, err := w.Write([]byte(t.Piece)); err != nil { + return err + } + + // t.t.Offset (uint64) (uint64) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.Offset))); err != nil { + return err + } + + // t.t.Size (uint64) (uint64) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.Size))); err != nil { + return err + } + return nil +} + +func (t *SealedRef) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) + + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 3 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.t.Piece (string) (string) + + { + sval, err := cbg.ReadString(br) + if err != nil { + return err + } + + t.Piece = string(sval) + } + // t.t.Offset (uint64) (uint64) + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.Offset = uint64(extra) + // t.t.Size (uint64) (uint64) + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.Size = uint64(extra) + return nil +} + +func (t *SealedRefs) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{129}); err != nil { + return err + } + + // t.t.Refs ([]api.SealedRef) (slice) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.Refs)))); err != nil { + return err + } + for _, v := range t.Refs { + if err := v.MarshalCBOR(w); err != nil { + return err + } + } + return nil +} + +func (t *SealedRefs) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) + + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 1 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.t.Refs ([]api.SealedRef) (slice) + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if extra > 8192 { + return fmt.Errorf("t.Refs: array too large (%d)", extra) + } + + if maj != cbg.MajArray { + return fmt.Errorf("expected cbor array") + } + if extra > 0 { + t.Refs = make([]SealedRef, extra) + } + for i := 0; i < int(extra); i++ { + + var v SealedRef + if err := v.UnmarshalCBOR(br); err != nil { + return err + } + + t.Refs[i] = v + } + + return nil +} diff --git a/gen/main.go b/gen/main.go index 1aac6ef06..296b7bbf4 100644 --- a/gen/main.go +++ b/gen/main.go @@ -48,6 +48,8 @@ func main() { err = gen.WriteTupleEncodersToFile("./api/cbor_gen.go", "api", api.PaymentInfo{}, + api.SealedRef{}, + api.SealedRefs{}, ) if err != nil { fmt.Println(err) diff --git a/storage/sectorblocks/blocks.go b/storage/sectorblocks/blocks.go index 4f1049f23..d8666b41c 100644 --- a/storage/sectorblocks/blocks.go +++ b/storage/sectorblocks/blocks.go @@ -1,25 +1,26 @@ package sectorblocks import ( + "bytes" "context" "errors" - "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/lib/sectorbuilder" - "github.com/filecoin-project/lotus/node/modules/dtypes" - "github.com/ipfs/go-datastore/namespace" - "github.com/ipfs/go-datastore/query" - blockstore "github.com/ipfs/go-ipfs-blockstore" - ipld "github.com/ipfs/go-ipld-format" - "github.com/ipfs/go-unixfs" - "golang.org/x/xerrors" "sync" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" + "github.com/ipfs/go-datastore/query" + blockstore "github.com/ipfs/go-ipfs-blockstore" dshelp "github.com/ipfs/go-ipfs-ds-help" files "github.com/ipfs/go-ipfs-files" - cbor "github.com/ipfs/go-ipld-cbor" + ipld "github.com/ipfs/go-ipld-format" + "github.com/ipfs/go-unixfs" + "golang.org/x/xerrors" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/lib/cborrpc" + "github.com/filecoin-project/lotus/lib/sectorbuilder" + "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/storage/sector" ) @@ -74,14 +75,14 @@ type UnixfsReader interface { type refStorer struct { blockReader UnixfsReader - writeRef func(cid cid.Cid, pieceRef string, offset uint64, size uint32) error + writeRef func(cid cid.Cid, pieceRef string, offset uint64, size uint64) error intermediate blockstore.Blockstore pieceRef string remaining []byte } -func (st *SectorBlocks) writeRef(cid cid.Cid, pieceRef string, offset uint64, size uint32) error { +func (st *SectorBlocks) writeRef(cid cid.Cid, pieceRef string, offset uint64, size uint64) error { st.keyLk.Lock() // TODO: make this multithreaded defer st.keyLk.Unlock() @@ -93,20 +94,20 @@ func (st *SectorBlocks) writeRef(cid cid.Cid, pieceRef string, offset uint64, si return xerrors.Errorf("getting existing refs: %w", err) } - var refs []api.SealedRef + var refs api.SealedRefs if len(v) > 0 { - if err := cbor.DecodeInto(v, &refs); err != nil { + if err := cborrpc.ReadCborRPC(bytes.NewReader(v), &refs); err != nil { return xerrors.Errorf("decoding existing refs: %w", err) } } - refs = append(refs, api.SealedRef{ + refs.Refs = append(refs.Refs, api.SealedRef{ Piece: pieceRef, Offset: offset, Size: size, }) - newRef, err := cbor.DumpObject(&refs) + newRef, err := cborrpc.Dump(&refs) if err != nil { return xerrors.Errorf("serializing refs: %w", err) } @@ -141,7 +142,7 @@ func (r *refStorer) Read(p []byte) (n int, err error) { continue } - if err := r.writeRef(nd.Cid(), r.pieceRef, offset, uint32(len(data))); err != nil { + if err := r.writeRef(nd.Cid(), r.pieceRef, offset, uint64(len(data))); err != nil { return 0, xerrors.Errorf("writing ref: %w", err) } @@ -188,12 +189,12 @@ func (st *SectorBlocks) List() (map[cid.Cid][]api.SealedRef, error) { return nil, err } - var refs []api.SealedRef - if err := cbor.DecodeInto(ent.Value, &refs); err != nil { + var refs api.SealedRefs + if err := cborrpc.ReadCborRPC(bytes.NewReader(ent.Value), &refs); err != nil { return nil, err } - out[refCid] = refs + out[refCid] = refs.Refs } return out, nil @@ -208,12 +209,12 @@ func (st *SectorBlocks) GetRefs(k cid.Cid) ([]api.SealedRef, error) { // TODO: t return nil, err } - var refs []api.SealedRef - if err := cbor.DecodeInto(ent, &refs); err != nil { + var refs api.SealedRefs + if err := cborrpc.ReadCborRPC(bytes.NewReader(ent), &refs); err != nil { return nil, err } - return refs, nil + return refs.Refs, nil } func (st *SectorBlocks) GetSize(k cid.Cid) (uint64, error) {