Make deals work with chunked files
This commit is contained in:
parent
c0566399c6
commit
a5d3122ba5
@ -5,6 +5,6 @@ package build
|
|||||||
const UnixfsChunkSize uint64 = 1 << 20
|
const UnixfsChunkSize uint64 = 1 << 20
|
||||||
const UnixfsLinksPerLevel = 1024
|
const UnixfsLinksPerLevel = 1024
|
||||||
|
|
||||||
const SectorSize = 1024
|
const SectorSize = 16 << 20
|
||||||
|
|
||||||
// TODO: Move other important consts here
|
// TODO: Move other important consts here
|
||||||
|
@ -31,7 +31,7 @@ func (sm *StorageMinerAPI) ActorAddresses(context.Context) ([]address.Address, e
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (sm *StorageMinerAPI) StoreGarbageData(ctx context.Context) (uint64, error) {
|
func (sm *StorageMinerAPI) StoreGarbageData(ctx context.Context) (uint64, error) {
|
||||||
size := uint64(build.SectorSize - 8) // this is the most data we can fit in a sector. TODO: check if correct
|
size := sectorbuilder.UserBytesForSectorSize(build.SectorSize)
|
||||||
|
|
||||||
name := fmt.Sprintf("fake-file-%d", rand.Intn(100000000))
|
name := fmt.Sprintf("fake-file-%d", rand.Intn(100000000))
|
||||||
sectorId, err := sm.Sectors.AddPiece(name, size, io.LimitReader(rand.New(rand.NewSource(42)), 1016))
|
sectorId, err := sm.Sectors.AddPiece(name, size, io.LimitReader(rand.New(rand.NewSource(42)), 1016))
|
||||||
|
@ -7,6 +7,7 @@ import (
|
|||||||
"github.com/filecoin-project/go-lotus/node/modules/dtypes"
|
"github.com/filecoin-project/go-lotus/node/modules/dtypes"
|
||||||
"github.com/ipfs/go-datastore/namespace"
|
"github.com/ipfs/go-datastore/namespace"
|
||||||
"github.com/ipfs/go-datastore/query"
|
"github.com/ipfs/go-datastore/query"
|
||||||
|
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
||||||
ipld "github.com/ipfs/go-ipld-format"
|
ipld "github.com/ipfs/go-ipld-format"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
@ -26,10 +27,13 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var dsPrefix = datastore.NewKey("/sealedblocks")
|
var dsPrefix = datastore.NewKey("/sealedblocks")
|
||||||
|
var imBlocksPrefix = datastore.NewKey("/intermediate")
|
||||||
|
|
||||||
type SectorBlocks struct {
|
type SectorBlocks struct {
|
||||||
*sector.Store
|
*sector.Store
|
||||||
|
|
||||||
|
intermediate blockstore.Blockstore // holds intermediate nodes TODO: consider combining with the staging blockstore
|
||||||
|
|
||||||
unsealed *unsealedBlocks
|
unsealed *unsealedBlocks
|
||||||
keys datastore.Batching
|
keys datastore.Batching
|
||||||
keyLk sync.Mutex
|
keyLk sync.Mutex
|
||||||
@ -38,11 +42,15 @@ type SectorBlocks struct {
|
|||||||
func NewSectorBlocks(sectst *sector.Store, ds dtypes.MetadataDS, sb *sectorbuilder.SectorBuilder) *SectorBlocks {
|
func NewSectorBlocks(sectst *sector.Store, ds dtypes.MetadataDS, sb *sectorbuilder.SectorBuilder) *SectorBlocks {
|
||||||
sbc := &SectorBlocks{
|
sbc := &SectorBlocks{
|
||||||
Store: sectst,
|
Store: sectst,
|
||||||
keys: namespace.Wrap(ds, dsPrefix),
|
|
||||||
|
intermediate: blockstore.NewBlockstore(namespace.Wrap(ds, imBlocksPrefix)),
|
||||||
|
|
||||||
|
keys: namespace.Wrap(ds, dsPrefix),
|
||||||
}
|
}
|
||||||
|
|
||||||
unsealed := &unsealedBlocks{ // TODO: untangle this
|
unsealed := &unsealedBlocks{ // TODO: untangle this
|
||||||
sb: sb,
|
sb: sb,
|
||||||
|
|
||||||
unsealed: map[string][]byte{},
|
unsealed: map[string][]byte{},
|
||||||
unsealing: map[string]chan struct{}{},
|
unsealing: map[string]chan struct{}{},
|
||||||
}
|
}
|
||||||
@ -60,8 +68,9 @@ type UnixfsReader interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type refStorer struct {
|
type refStorer struct {
|
||||||
blockReader UnixfsReader
|
blockReader UnixfsReader
|
||||||
writeRef func(cid cid.Cid, offset uint64, size uint32) error
|
writeRef func(cid cid.Cid, offset uint64, size uint32) error
|
||||||
|
intermediate blockstore.Blockstore
|
||||||
|
|
||||||
pieceRef string
|
pieceRef string
|
||||||
remaining []byte
|
remaining []byte
|
||||||
@ -119,7 +128,12 @@ func (r *refStorer) Read(p []byte) (n int, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if len(data) == 0 {
|
if len(data) == 0 {
|
||||||
panic("Handle intermediate nodes") // TODO: !
|
// TODO: batch
|
||||||
|
// TODO: GC
|
||||||
|
if err := r.intermediate.Put(nd); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := r.writeRef(nd.Cid(), offset, uint32(len(data))); err != nil {
|
if err := r.writeRef(nd.Cid(), offset, uint32(len(data))); err != nil {
|
||||||
@ -141,7 +155,12 @@ func (st *SectorBlocks) AddUnixfsPiece(ref cid.Cid, r UnixfsReader, keepAtLeast
|
|||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
refst := &refStorer{blockReader: r, pieceRef: string(SerializationUnixfs0) + ref.String(), writeRef: st.writeRef}
|
refst := &refStorer{
|
||||||
|
blockReader: r,
|
||||||
|
pieceRef: string(SerializationUnixfs0) + ref.String(),
|
||||||
|
writeRef: st.writeRef,
|
||||||
|
intermediate: st.intermediate,
|
||||||
|
}
|
||||||
|
|
||||||
return st.Store.AddPiece(refst.pieceRef, uint64(size), refst)
|
return st.Store.AddPiece(refst.pieceRef, uint64(size), refst)
|
||||||
}
|
}
|
||||||
@ -196,7 +215,7 @@ func (st *SectorBlocks) Has(k cid.Cid) (bool, error) {
|
|||||||
|
|
||||||
func (st *SectorBlocks) SealedBlockstore(approveUnseal func() error) *SectorBlockStore {
|
func (st *SectorBlocks) SealedBlockstore(approveUnseal func() error) *SectorBlockStore {
|
||||||
return &SectorBlockStore{
|
return &SectorBlockStore{
|
||||||
//local: nil, // TODO: Pass staging
|
intermediate: st.intermediate,
|
||||||
sectorBlocks: st,
|
sectorBlocks: st,
|
||||||
approveUnseal: approveUnseal,
|
approveUnseal: approveUnseal,
|
||||||
}
|
}
|
||||||
|
@ -8,7 +8,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type SectorBlockStore struct {
|
type SectorBlockStore struct {
|
||||||
// local blockstore.Blockstore // staging before GC // TODO: Pass staging
|
intermediate blockstore.Blockstore
|
||||||
sectorBlocks *SectorBlocks
|
sectorBlocks *SectorBlocks
|
||||||
|
|
||||||
approveUnseal func() error
|
approveUnseal func() error
|
||||||
@ -38,25 +38,25 @@ func (s *SectorBlockStore) HashOnRead(enabled bool) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *SectorBlockStore) Has(c cid.Cid) (bool, error) {
|
func (s *SectorBlockStore) Has(c cid.Cid) (bool, error) {
|
||||||
/*has, err := s.local.Has(c) // TODO: Pass staging
|
has, err := s.intermediate.Has(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
if has {
|
if has {
|
||||||
return true, nil
|
return true, nil
|
||||||
}*/
|
}
|
||||||
|
|
||||||
return s.sectorBlocks.Has(c)
|
return s.sectorBlocks.Has(c)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SectorBlockStore) Get(c cid.Cid) (blocks.Block, error) {
|
func (s *SectorBlockStore) Get(c cid.Cid) (blocks.Block, error) {
|
||||||
/*val, err := s.local.Get(c) // TODO: Pass staging
|
val, err := s.intermediate.Get(c)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return val, nil
|
return val, nil
|
||||||
}
|
}
|
||||||
if err != blockstore.ErrNotFound {
|
if err != blockstore.ErrNotFound {
|
||||||
return nil, err
|
return nil, err
|
||||||
}*/
|
}
|
||||||
|
|
||||||
refs, err := s.sectorBlocks.GetRefs(c)
|
refs, err := s.sectorBlocks.GetRefs(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user