lotus/storage/sectorblocks/blocks.go

266 lines
6.0 KiB
Go
Raw Normal View History

2019-08-27 18:45:21 +00:00
package sectorblocks
import (
2019-11-06 12:22:08 +00:00
"bytes"
2019-08-27 18:45:21 +00:00
"context"
"errors"
2019-11-06 17:38:42 +00:00
"io"
2019-08-27 18:45:21 +00:00
"sync"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
2019-11-06 12:22:08 +00:00
"github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-datastore/query"
blockstore "github.com/ipfs/go-ipfs-blockstore"
2019-08-27 18:45:21 +00:00
dshelp "github.com/ipfs/go-ipfs-ds-help"
files "github.com/ipfs/go-ipfs-files"
2019-11-06 12:22:08 +00:00
ipld "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-unixfs"
"golang.org/x/xerrors"
2019-08-27 18:45:21 +00:00
2019-11-06 12:22:08 +00:00
"github.com/filecoin-project/lotus/api"
2019-11-07 14:11:39 +00:00
"github.com/filecoin-project/lotus/lib/cborutil"
2019-11-06 19:00:04 +00:00
"github.com/filecoin-project/lotus/lib/padreader"
2019-11-06 12:22:08 +00:00
"github.com/filecoin-project/lotus/lib/sectorbuilder"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/storage/sector"
2019-08-27 18:45:21 +00:00
)
type SealSerialization uint8
const (
SerializationUnixfs0 SealSerialization = 'u'
)
var dsPrefix = datastore.NewKey("/sealedblocks")
2019-08-27 23:03:22 +00:00
var imBlocksPrefix = datastore.NewKey("/intermediate")
2019-08-27 18:45:21 +00:00
var ErrNotFound = errors.New("not found")
2019-08-27 18:45:21 +00:00
type SectorBlocks struct {
*sector.Store
2019-08-27 23:03:22 +00:00
intermediate blockstore.Blockstore // holds intermediate nodes TODO: consider combining with the staging blockstore
2019-08-27 18:45:21 +00:00
unsealed *unsealedBlocks
2019-08-27 19:54:39 +00:00
keys datastore.Batching
keyLk sync.Mutex
2019-08-27 18:45:21 +00:00
}
func NewSectorBlocks(sectst *sector.Store, ds dtypes.MetadataDS, sb *sectorbuilder.SectorBuilder) *SectorBlocks {
sbc := &SectorBlocks{
Store: sectst,
2019-08-27 23:03:22 +00:00
intermediate: blockstore.NewBlockstore(namespace.Wrap(ds, imBlocksPrefix)),
keys: namespace.Wrap(ds, dsPrefix),
2019-08-27 18:45:21 +00:00
}
unsealed := &unsealedBlocks{ // TODO: untangle this
2019-08-27 23:03:22 +00:00
sb: sb,
2019-08-27 19:54:39 +00:00
unsealed: map[string][]byte{},
2019-08-27 18:45:21 +00:00
unsealing: map[string]chan struct{}{},
}
sbc.unsealed = unsealed
return sbc
}
type UnixfsReader interface {
files.File
// ReadBlock reads data from a single unixfs block. Data is nil
// for intermediate nodes
ReadBlock(context.Context) (data []byte, offset uint64, nd ipld.Node, err error)
}
type refStorer struct {
2019-08-27 23:03:22 +00:00
blockReader UnixfsReader
2019-11-06 12:22:08 +00:00
writeRef func(cid cid.Cid, pieceRef string, offset uint64, size uint64) error
2019-08-27 23:03:22 +00:00
intermediate blockstore.Blockstore
2019-08-27 18:45:21 +00:00
pieceRef string
remaining []byte
}
2019-11-06 12:22:08 +00:00
func (st *SectorBlocks) writeRef(cid cid.Cid, pieceRef string, offset uint64, size uint64) error {
2019-08-27 18:45:21 +00:00
st.keyLk.Lock() // TODO: make this multithreaded
defer st.keyLk.Unlock()
v, err := st.keys.Get(dshelp.CidToDsKey(cid))
if err == datastore.ErrNotFound {
err = nil
}
if err != nil {
2019-11-06 12:04:33 +00:00
return xerrors.Errorf("getting existing refs: %w", err)
2019-08-27 18:45:21 +00:00
}
2019-11-06 12:22:08 +00:00
var refs api.SealedRefs
2019-08-27 18:45:21 +00:00
if len(v) > 0 {
2019-11-07 14:11:39 +00:00
if err := cborutil.ReadCborRPC(bytes.NewReader(v), &refs); err != nil {
2019-11-06 12:04:33 +00:00
return xerrors.Errorf("decoding existing refs: %w", err)
2019-08-27 18:45:21 +00:00
}
}
2019-11-06 12:22:08 +00:00
refs.Refs = append(refs.Refs, api.SealedRef{
Piece: pieceRef,
2019-08-27 18:45:21 +00:00
Offset: offset,
Size: size,
})
2019-11-07 14:11:39 +00:00
newRef, err := cborutil.Dump(&refs)
2019-08-27 18:45:21 +00:00
if err != nil {
2019-11-06 12:04:33 +00:00
return xerrors.Errorf("serializing refs: %w", err)
2019-08-27 18:45:21 +00:00
}
return st.keys.Put(dshelp.CidToDsKey(cid), newRef) // TODO: batch somehow
}
func (r *refStorer) Read(p []byte) (n int, err error) {
offset := 0
if len(r.remaining) > 0 {
offset += len(r.remaining)
read := copy(p, r.remaining)
if read == len(r.remaining) {
r.remaining = nil
} else {
r.remaining = r.remaining[read:]
}
return read, nil
}
for {
data, offset, nd, err := r.blockReader.ReadBlock(context.TODO())
if err != nil {
2019-11-06 17:38:42 +00:00
if err == io.EOF {
return 0, io.EOF
}
2019-11-06 12:04:33 +00:00
return 0, xerrors.Errorf("reading block: %w", err)
2019-08-27 18:45:21 +00:00
}
if len(data) == 0 {
2019-08-27 23:03:22 +00:00
// TODO: batch
// TODO: GC
if err := r.intermediate.Put(nd); err != nil {
2019-11-06 12:04:33 +00:00
return 0, xerrors.Errorf("storing intermediate node: %w", err)
2019-08-27 23:03:22 +00:00
}
continue
2019-08-27 18:45:21 +00:00
}
2019-11-06 12:22:08 +00:00
if err := r.writeRef(nd.Cid(), r.pieceRef, offset, uint64(len(data))); err != nil {
2019-11-06 12:04:33 +00:00
return 0, xerrors.Errorf("writing ref: %w", err)
2019-08-27 18:45:21 +00:00
}
read := copy(p, data)
if read < len(data) {
r.remaining = data[read:]
}
// TODO: read multiple
return read, nil
}
}
func (st *SectorBlocks) AddUnixfsPiece(ref cid.Cid, r UnixfsReader, dealID uint64) (sectorID uint64, err error) {
2019-08-27 18:45:21 +00:00
size, err := r.Size()
if err != nil {
return 0, err
}
2019-08-27 23:03:22 +00:00
refst := &refStorer{
blockReader: r,
pieceRef: string(SerializationUnixfs0) + ref.String(),
writeRef: st.writeRef,
intermediate: st.intermediate,
}
2019-08-27 18:45:21 +00:00
2019-11-06 19:00:04 +00:00
pr, psize := padreader.New(r, uint64(size))
return st.Store.AddPiece(refst.pieceRef, psize, pr, dealID)
2019-08-27 18:45:21 +00:00
}
func (st *SectorBlocks) List() (map[cid.Cid][]api.SealedRef, error) {
res, err := st.keys.Query(query.Query{})
if err != nil {
return nil, err
}
ents, err := res.Rest()
if err != nil {
return nil, err
}
out := map[cid.Cid][]api.SealedRef{}
for _, ent := range ents {
refCid, err := dshelp.DsKeyToCid(datastore.RawKey(ent.Key))
if err != nil {
return nil, err
}
2019-11-06 12:22:08 +00:00
var refs api.SealedRefs
2019-11-07 14:11:39 +00:00
if err := cborutil.ReadCborRPC(bytes.NewReader(ent.Value), &refs); err != nil {
2019-08-27 18:45:21 +00:00
return nil, err
}
2019-11-06 12:22:08 +00:00
out[refCid] = refs.Refs
2019-08-27 18:45:21 +00:00
}
return out, nil
}
func (st *SectorBlocks) GetRefs(k cid.Cid) ([]api.SealedRef, error) { // TODO: track local sectors
ent, err := st.keys.Get(dshelp.CidToDsKey(k))
if err == datastore.ErrNotFound {
err = ErrNotFound
}
2019-08-27 18:45:21 +00:00
if err != nil {
return nil, err
}
2019-11-06 12:22:08 +00:00
var refs api.SealedRefs
2019-11-07 14:11:39 +00:00
if err := cborutil.ReadCborRPC(bytes.NewReader(ent), &refs); err != nil {
2019-08-27 18:45:21 +00:00
return nil, err
}
2019-11-06 12:22:08 +00:00
return refs.Refs, nil
2019-08-27 18:45:21 +00:00
}
func (st *SectorBlocks) GetSize(k cid.Cid) (uint64, error) {
blk, err := st.intermediate.Get(k)
if err == blockstore.ErrNotFound {
refs, err := st.GetRefs(k)
if err != nil {
return 0, err
}
return uint64(refs[0].Size), nil
}
if err != nil {
return 0, err
}
nd, err := ipld.Decode(blk)
if err != nil {
return 0, err
}
fsn, err := unixfs.ExtractFSNode(nd)
if err != nil {
return 0, err
}
return fsn.FileSize(), nil
}
2019-08-27 18:45:21 +00:00
func (st *SectorBlocks) Has(k cid.Cid) (bool, error) {
// TODO: ensure sector is still there
return st.keys.Has(dshelp.CidToDsKey(k))
}
func (st *SectorBlocks) SealedBlockstore(approveUnseal func() error) *SectorBlockStore {
return &SectorBlockStore{
2019-08-27 23:03:22 +00:00
intermediate: st.intermediate,
2019-08-27 18:45:21 +00:00
sectorBlocks: st,
approveUnseal: approveUnseal,
}
}