package sectorblocks import ( "context" "github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/node/modules/dtypes" "github.com/ipfs/go-datastore/namespace" "github.com/ipfs/go-datastore/query" "sync" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" dshelp "github.com/ipfs/go-ipfs-ds-help" files "github.com/ipfs/go-ipfs-files" cbor "github.com/ipfs/go-ipld-cbor" "github.com/filecoin-project/go-lotus/storage/sector" ) type SealSerialization uint8 const ( SerializationUnixfs0 SealSerialization = 'u' ) var dsPrefix = datastore.NewKey("/sealedblocks") type SectorBlocks struct { *sector.Store keys datastore.Batching keyLk sync.Mutex } func NewSectorBlocks(sectst *sector.Store, ds dtypes.MetadataDS) *SectorBlocks { return &SectorBlocks{ Store: sectst, keys: namespace.Wrap(ds, dsPrefix), } } type UnixfsReader interface { files.File // ReadBlock reads data from a single unixfs block. Data is nil // for intermediate nodes ReadBlock(context.Context) (data []byte, offset uint64, cid cid.Cid, err error) } type refStorer struct { blockReader UnixfsReader writeRef func(cid cid.Cid, offset uint64, size uint32) error pieceRef string remaining []byte } func (st *SectorBlocks) writeRef(cid cid.Cid, offset uint64, size uint32) error { st.keyLk.Lock() // TODO: make this multithreaded defer st.keyLk.Unlock() v, err := st.keys.Get(dshelp.CidToDsKey(cid)) if err == datastore.ErrNotFound { err = nil } if err != nil { return err } var refs []api.SealedRef if len(v) > 0 { if err := cbor.DecodeInto(v, &refs); err != nil { return err } } refs = append(refs, api.SealedRef{ Piece: string(SerializationUnixfs0) + cid.String(), Offset: offset, Size: size, }) newRef, err := cbor.DumpObject(&refs) if err != nil { return err } return st.keys.Put(dshelp.CidToDsKey(cid), newRef) // TODO: batch somehow } func (r *refStorer) Read(p []byte) (n int, err error) { offset := 0 if len(r.remaining) > 0 { offset += len(r.remaining) read := copy(p, r.remaining) if read == len(r.remaining) { r.remaining = nil } else { r.remaining = r.remaining[read:] } return read, nil } for { data, offset, cid, err := r.blockReader.ReadBlock(context.TODO()) if err != nil { return 0, err } if len(data) == 0 { panic("Handle intermediate nodes") // TODO: ! } if err := r.writeRef(cid, offset, uint32(len(data))); err != nil { return 0, err } read := copy(p, data) if read < len(data) { r.remaining = data[read:] } // TODO: read multiple return read, nil } } func (st *SectorBlocks) AddUnixfsPiece(ref cid.Cid, r UnixfsReader, keepAtLeast uint64) (sectorID uint64, err error) { size, err := r.Size() if err != nil { return 0, err } refst := &refStorer{blockReader: r, pieceRef: string(SerializationUnixfs0) + ref.String(), writeRef: st.writeRef} return st.Store.AddPiece(refst.pieceRef, uint64(size), refst) } func (st *SectorBlocks) List() (map[cid.Cid][]api.SealedRef, error) { res, err := st.keys.Query(query.Query{}) if err != nil { return nil, err } ents, err := res.Rest() if err != nil { return nil, err } out := map[cid.Cid][]api.SealedRef{} for _, ent := range ents { refCid, err := dshelp.DsKeyToCid(datastore.RawKey(ent.Key)) if err != nil { return nil, err } var refs []api.SealedRef if err := cbor.DecodeInto(ent.Value, &refs); err != nil { return nil, err } out[refCid] = refs } return out, nil } func (st *SectorBlocks) GetRefs(k cid.Cid) ([]api.SealedRef, error) { // TODO: track unsealed sectors ent, err := st.keys.Get(dshelp.CidToDsKey(k)) if err != nil { return nil, err } var refs []api.SealedRef if err := cbor.DecodeInto(ent, &refs); err != nil { return nil, err } return refs, nil }