fix sector block ref serialization

This commit is contained in:
Łukasz Magiera 2019-11-06 13:22:08 +01:00
parent cb3965bcf5
commit 40b1f91843
4 changed files with 173 additions and 24 deletions

View File

@ -69,5 +69,9 @@ type StorageMiner interface {
type SealedRef struct { type SealedRef struct {
Piece string Piece string
Offset uint64 Offset uint64
Size uint32 Size uint64
}
type SealedRefs struct {
Refs []SealedRef
} }

View File

@ -127,3 +127,145 @@ func (t *PaymentInfo) UnmarshalCBOR(r io.Reader) error {
return nil return nil
} }
func (t *SealedRef) MarshalCBOR(w io.Writer) error {
if t == nil {
_, err := w.Write(cbg.CborNull)
return err
}
if _, err := w.Write([]byte{131}); err != nil {
return err
}
// t.t.Piece (string) (string)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.Piece)))); err != nil {
return err
}
if _, err := w.Write([]byte(t.Piece)); err != nil {
return err
}
// t.t.Offset (uint64) (uint64)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.Offset))); err != nil {
return err
}
// t.t.Size (uint64) (uint64)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.Size))); err != nil {
return err
}
return nil
}
func (t *SealedRef) UnmarshalCBOR(r io.Reader) error {
br := cbg.GetPeeker(r)
maj, extra, err := cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajArray {
return fmt.Errorf("cbor input should be of type array")
}
if extra != 3 {
return fmt.Errorf("cbor input had wrong number of fields")
}
// t.t.Piece (string) (string)
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
t.Piece = string(sval)
}
// t.t.Offset (uint64) (uint64)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.Offset = uint64(extra)
// t.t.Size (uint64) (uint64)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.Size = uint64(extra)
return nil
}
func (t *SealedRefs) MarshalCBOR(w io.Writer) error {
if t == nil {
_, err := w.Write(cbg.CborNull)
return err
}
if _, err := w.Write([]byte{129}); err != nil {
return err
}
// t.t.Refs ([]api.SealedRef) (slice)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajArray, uint64(len(t.Refs)))); err != nil {
return err
}
for _, v := range t.Refs {
if err := v.MarshalCBOR(w); err != nil {
return err
}
}
return nil
}
func (t *SealedRefs) UnmarshalCBOR(r io.Reader) error {
br := cbg.GetPeeker(r)
maj, extra, err := cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajArray {
return fmt.Errorf("cbor input should be of type array")
}
if extra != 1 {
return fmt.Errorf("cbor input had wrong number of fields")
}
// t.t.Refs ([]api.SealedRef) (slice)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if extra > 8192 {
return fmt.Errorf("t.Refs: array too large (%d)", extra)
}
if maj != cbg.MajArray {
return fmt.Errorf("expected cbor array")
}
if extra > 0 {
t.Refs = make([]SealedRef, extra)
}
for i := 0; i < int(extra); i++ {
var v SealedRef
if err := v.UnmarshalCBOR(br); err != nil {
return err
}
t.Refs[i] = v
}
return nil
}

View File

@ -48,6 +48,8 @@ func main() {
err = gen.WriteTupleEncodersToFile("./api/cbor_gen.go", "api", err = gen.WriteTupleEncodersToFile("./api/cbor_gen.go", "api",
api.PaymentInfo{}, api.PaymentInfo{},
api.SealedRef{},
api.SealedRefs{},
) )
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)

View File

@ -1,25 +1,26 @@
package sectorblocks package sectorblocks
import ( import (
"bytes"
"context" "context"
"errors" "errors"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/lib/sectorbuilder"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-datastore/query"
blockstore "github.com/ipfs/go-ipfs-blockstore"
ipld "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-unixfs"
"golang.org/x/xerrors"
"sync" "sync"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-datastore/query"
blockstore "github.com/ipfs/go-ipfs-blockstore"
dshelp "github.com/ipfs/go-ipfs-ds-help" dshelp "github.com/ipfs/go-ipfs-ds-help"
files "github.com/ipfs/go-ipfs-files" files "github.com/ipfs/go-ipfs-files"
cbor "github.com/ipfs/go-ipld-cbor" ipld "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-unixfs"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/lib/cborrpc"
"github.com/filecoin-project/lotus/lib/sectorbuilder"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/storage/sector" "github.com/filecoin-project/lotus/storage/sector"
) )
@ -74,14 +75,14 @@ type UnixfsReader interface {
type refStorer struct { type refStorer struct {
blockReader UnixfsReader blockReader UnixfsReader
writeRef func(cid cid.Cid, pieceRef string, offset uint64, size uint32) error writeRef func(cid cid.Cid, pieceRef string, offset uint64, size uint64) error
intermediate blockstore.Blockstore intermediate blockstore.Blockstore
pieceRef string pieceRef string
remaining []byte remaining []byte
} }
func (st *SectorBlocks) writeRef(cid cid.Cid, pieceRef string, offset uint64, size uint32) error { func (st *SectorBlocks) writeRef(cid cid.Cid, pieceRef string, offset uint64, size uint64) error {
st.keyLk.Lock() // TODO: make this multithreaded st.keyLk.Lock() // TODO: make this multithreaded
defer st.keyLk.Unlock() defer st.keyLk.Unlock()
@ -93,20 +94,20 @@ func (st *SectorBlocks) writeRef(cid cid.Cid, pieceRef string, offset uint64, si
return xerrors.Errorf("getting existing refs: %w", err) return xerrors.Errorf("getting existing refs: %w", err)
} }
var refs []api.SealedRef var refs api.SealedRefs
if len(v) > 0 { if len(v) > 0 {
if err := cbor.DecodeInto(v, &refs); err != nil { if err := cborrpc.ReadCborRPC(bytes.NewReader(v), &refs); err != nil {
return xerrors.Errorf("decoding existing refs: %w", err) return xerrors.Errorf("decoding existing refs: %w", err)
} }
} }
refs = append(refs, api.SealedRef{ refs.Refs = append(refs.Refs, api.SealedRef{
Piece: pieceRef, Piece: pieceRef,
Offset: offset, Offset: offset,
Size: size, Size: size,
}) })
newRef, err := cbor.DumpObject(&refs) newRef, err := cborrpc.Dump(&refs)
if err != nil { if err != nil {
return xerrors.Errorf("serializing refs: %w", err) return xerrors.Errorf("serializing refs: %w", err)
} }
@ -141,7 +142,7 @@ func (r *refStorer) Read(p []byte) (n int, err error) {
continue continue
} }
if err := r.writeRef(nd.Cid(), r.pieceRef, offset, uint32(len(data))); err != nil { if err := r.writeRef(nd.Cid(), r.pieceRef, offset, uint64(len(data))); err != nil {
return 0, xerrors.Errorf("writing ref: %w", err) return 0, xerrors.Errorf("writing ref: %w", err)
} }
@ -188,12 +189,12 @@ func (st *SectorBlocks) List() (map[cid.Cid][]api.SealedRef, error) {
return nil, err return nil, err
} }
var refs []api.SealedRef var refs api.SealedRefs
if err := cbor.DecodeInto(ent.Value, &refs); err != nil { if err := cborrpc.ReadCborRPC(bytes.NewReader(ent.Value), &refs); err != nil {
return nil, err return nil, err
} }
out[refCid] = refs out[refCid] = refs.Refs
} }
return out, nil return out, nil
@ -208,12 +209,12 @@ func (st *SectorBlocks) GetRefs(k cid.Cid) ([]api.SealedRef, error) { // TODO: t
return nil, err return nil, err
} }
var refs []api.SealedRef var refs api.SealedRefs
if err := cbor.DecodeInto(ent, &refs); err != nil { if err := cborrpc.ReadCborRPC(bytes.NewReader(ent), &refs); err != nil {
return nil, err return nil, err
} }
return refs, nil return refs.Refs, nil
} }
func (st *SectorBlocks) GetSize(k cid.Cid) (uint64, error) { func (st *SectorBlocks) GetSize(k cid.Cid) (uint64, error) {