diff --git a/build/params.go b/build/params.go index 0901bebc7..21c102e9f 100644 --- a/build/params.go +++ b/build/params.go @@ -5,6 +5,6 @@ package build const UnixfsChunkSize uint64 = 1 << 20 const UnixfsLinksPerLevel = 1024 -const SectorSize = 1024 +const SectorSize = 16 << 20 // TODO: Move other important consts here diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 11bfe39f2..41371fa66 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -31,7 +31,7 @@ func (sm *StorageMinerAPI) ActorAddresses(context.Context) ([]address.Address, e } func (sm *StorageMinerAPI) StoreGarbageData(ctx context.Context) (uint64, error) { - size := uint64(build.SectorSize - 8) // this is the most data we can fit in a sector. TODO: check if correct + size := sectorbuilder.UserBytesForSectorSize(build.SectorSize) name := fmt.Sprintf("fake-file-%d", rand.Intn(100000000)) sectorId, err := sm.Sectors.AddPiece(name, size, io.LimitReader(rand.New(rand.NewSource(42)), 1016)) diff --git a/storage/sectorblocks/blocks.go b/storage/sectorblocks/blocks.go index 76b0fe045..cebb8132c 100644 --- a/storage/sectorblocks/blocks.go +++ b/storage/sectorblocks/blocks.go @@ -7,6 +7,7 @@ import ( "github.com/filecoin-project/go-lotus/node/modules/dtypes" "github.com/ipfs/go-datastore/namespace" "github.com/ipfs/go-datastore/query" + blockstore "github.com/ipfs/go-ipfs-blockstore" ipld "github.com/ipfs/go-ipld-format" "sync" @@ -26,10 +27,13 @@ const ( ) var dsPrefix = datastore.NewKey("/sealedblocks") +var imBlocksPrefix = datastore.NewKey("/intermediate") type SectorBlocks struct { *sector.Store + intermediate blockstore.Blockstore // holds intermediate nodes TODO: consider combining with the staging blockstore + unsealed *unsealedBlocks keys datastore.Batching keyLk sync.Mutex @@ -38,11 +42,15 @@ type SectorBlocks struct { func NewSectorBlocks(sectst *sector.Store, ds dtypes.MetadataDS, sb *sectorbuilder.SectorBuilder) *SectorBlocks { sbc := &SectorBlocks{ Store: sectst, - keys: namespace.Wrap(ds, dsPrefix), + + intermediate: blockstore.NewBlockstore(namespace.Wrap(ds, imBlocksPrefix)), + + keys: namespace.Wrap(ds, dsPrefix), } unsealed := &unsealedBlocks{ // TODO: untangle this - sb: sb, + sb: sb, + unsealed: map[string][]byte{}, unsealing: map[string]chan struct{}{}, } @@ -60,8 +68,9 @@ type UnixfsReader interface { } type refStorer struct { - blockReader UnixfsReader - writeRef func(cid cid.Cid, offset uint64, size uint32) error + blockReader UnixfsReader + writeRef func(cid cid.Cid, offset uint64, size uint32) error + intermediate blockstore.Blockstore pieceRef string remaining []byte @@ -119,7 +128,12 @@ func (r *refStorer) Read(p []byte) (n int, err error) { } if len(data) == 0 { - panic("Handle intermediate nodes") // TODO: ! + // TODO: batch + // TODO: GC + if err := r.intermediate.Put(nd); err != nil { + return 0, err + } + continue } if err := r.writeRef(nd.Cid(), offset, uint32(len(data))); err != nil { @@ -141,7 +155,12 @@ func (st *SectorBlocks) AddUnixfsPiece(ref cid.Cid, r UnixfsReader, keepAtLeast return 0, err } - refst := &refStorer{blockReader: r, pieceRef: string(SerializationUnixfs0) + ref.String(), writeRef: st.writeRef} + refst := &refStorer{ + blockReader: r, + pieceRef: string(SerializationUnixfs0) + ref.String(), + writeRef: st.writeRef, + intermediate: st.intermediate, + } return st.Store.AddPiece(refst.pieceRef, uint64(size), refst) } @@ -196,7 +215,7 @@ func (st *SectorBlocks) Has(k cid.Cid) (bool, error) { func (st *SectorBlocks) SealedBlockstore(approveUnseal func() error) *SectorBlockStore { return &SectorBlockStore{ - //local: nil, // TODO: Pass staging + intermediate: st.intermediate, sectorBlocks: st, approveUnseal: approveUnseal, } diff --git a/storage/sectorblocks/blockstore.go b/storage/sectorblocks/blockstore.go index 2087b9b6d..e28e750d0 100644 --- a/storage/sectorblocks/blockstore.go +++ b/storage/sectorblocks/blockstore.go @@ -8,7 +8,7 @@ import ( ) type SectorBlockStore struct { - // local blockstore.Blockstore // staging before GC // TODO: Pass staging + intermediate blockstore.Blockstore sectorBlocks *SectorBlocks approveUnseal func() error @@ -38,25 +38,25 @@ func (s *SectorBlockStore) HashOnRead(enabled bool) { } func (s *SectorBlockStore) Has(c cid.Cid) (bool, error) { - /*has, err := s.local.Has(c) // TODO: Pass staging + has, err := s.intermediate.Has(c) if err != nil { return false, err } if has { return true, nil - }*/ + } return s.sectorBlocks.Has(c) } func (s *SectorBlockStore) Get(c cid.Cid) (blocks.Block, error) { - /*val, err := s.local.Get(c) // TODO: Pass staging + val, err := s.intermediate.Get(c) if err == nil { return val, nil } if err != blockstore.ErrNotFound { return nil, err - }*/ + } refs, err := s.sectorBlocks.GetRefs(c) if err != nil {