From be30bc79a5f3311c51a78fc32138c5a002aadff5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 27 Aug 2019 20:45:21 +0200 Subject: [PATCH] Retrieval works! --- api/api.go | 22 ++++ api/struct.go | 5 + chain/sub/incoming.go | 4 +- chain/sync.go | 2 +- cli/client.go | 20 +-- go.sum | 1 + miner/miner.go | 2 +- node/impl/full/client.go | 6 +- node/modules/storageminer.go | 3 +- retrieval/client.go | 32 +++-- retrieval/miner.go | 152 ++++++++++++++++++++- retrieval/types.go | 20 +-- storage/sectorblocks/blocks.go | 204 +++++++++++++++++++++++++++++ storage/sectorblocks/blockstore.go | 190 +++++++-------------------- storage/sectorblocks/unsealed.go | 98 ++++++++++++++ 15 files changed, 576 insertions(+), 185 deletions(-) create mode 100644 storage/sectorblocks/blocks.go create mode 100644 storage/sectorblocks/unsealed.go diff --git a/api/api.go b/api/api.go index 17f633d64..8130df873 100644 --- a/api/api.go +++ b/api/api.go @@ -93,6 +93,7 @@ type FullNode interface { ClientStartDeal(ctx context.Context, data cid.Cid, miner address.Address, price types.BigInt, blocksDuration uint64) (*cid.Cid, error) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error) ClientFindData(ctx context.Context, root cid.Cid) ([]QueryOffer, error) // TODO: specify serialization mode we want (defaults to unixfs for now) + ClientRetrieve(ctx context.Context, order RetrievalOrder) error // TODO: maybe just allow putting this straight into some file // ClientUnimport removes references to the specified file from filestore //ClientUnimport(path string) @@ -196,9 +197,30 @@ type SealedRef struct { type QueryOffer struct { Err string + Root cid.Cid + Size uint64 MinPrice types.BigInt Miner address.Address MinerPeerID peer.ID } + +func (o *QueryOffer) Order() RetrievalOrder { + return RetrievalOrder{ + Root: o.Root, + Size: o.Size, + Miner: o.Miner, + MinerPeerID: o.MinerPeerID, + } +} + +type RetrievalOrder struct { + // TODO: make this loss unixfs specific + Root cid.Cid + Size uint64 + // TODO: support offset + + Miner address.Address + MinerPeerID peer.ID +} diff --git a/api/struct.go b/api/struct.go index f3e6d449c..00424bfbe 100644 --- a/api/struct.go +++ b/api/struct.go @@ -73,6 +73,7 @@ type FullNodeStruct struct { ClientHasLocal func(ctx context.Context, root cid.Cid) (bool, error) `perm:"write"` ClientFindData func(ctx context.Context, root cid.Cid) ([]QueryOffer, error) `perm:"read"` ClientStartDeal func(ctx context.Context, data cid.Cid, miner address.Address, price types.BigInt, blocksDuration uint64) (*cid.Cid, error) `perm:"admin"` + ClientRetrieve func(ctx context.Context, order RetrievalOrder) error `perm:"admin"` StateMinerSectors func(context.Context, address.Address) ([]*SectorInfo, error) `perm:"read"` StateMinerProvingSet func(context.Context, address.Address) ([]*SectorInfo, error) `perm:"read"` @@ -166,6 +167,10 @@ func (c *FullNodeStruct) ClientStartDeal(ctx context.Context, data cid.Cid, mine return c.Internal.ClientStartDeal(ctx, data, miner, price, blocksDuration) } +func (c *FullNodeStruct) ClientRetrieve(ctx context.Context, order RetrievalOrder) error { + return c.Internal.ClientRetrieve(ctx, order) +} + func (c *FullNodeStruct) MpoolPending(ctx context.Context, ts *types.TipSet) ([]*types.SignedMessage, error) { return c.Internal.MpoolPending(ctx, ts) } diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index 06898eb70..8ff51804c 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -28,7 +28,7 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha } go func() { - log.Info("about to fetch messages for block from pubsub") + log.Debug("about to fetch messages for block from pubsub") bmsgs, err := s.Bsync.FetchMessagesByCids(context.TODO(), blk.BlsMessages) if err != nil { log.Errorf("failed to fetch all bls messages for block received over pubusb: %s", err) @@ -41,7 +41,7 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha return } - log.Info("inform new block over pubsub") + log.Debug("inform new block over pubsub") s.InformNewBlock(msg.GetFrom(), &types.FullBlock{ Header: blk.Header, BlsMessages: bmsgs, diff --git a/chain/sync.go b/chain/sync.go index c0581f28b..33650e258 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -88,7 +88,7 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) { } if from == syncer.self { // TODO: this is kindof a hack... - log.Infof("got block from ourselves") + log.Debug("got block from ourselves") if err := syncer.Sync(fts); err != nil { log.Errorf("failed to sync our own block: %s", err) diff --git a/cli/client.go b/cli/client.go index 20293bfb3..4d7f92476 100644 --- a/cli/client.go +++ b/cli/client.go @@ -181,7 +181,7 @@ var clientRetrieveCmd = &cli.Command{ // Check if we already have this data locally - has, err := api.ClientHasLocal(ctx, file) + /*has, err := api.ClientHasLocal(ctx, file) if err != nil { return err } @@ -189,20 +189,20 @@ var clientRetrieveCmd = &cli.Command{ if has { fmt.Println("Success: Already in local storage") return nil - } + }*/ // TODO: uncomment before merge - _, err = api.ClientFindData(ctx, file) + offers, err := api.ClientFindData(ctx, file) if err != nil { return err } - // Find miner which may have this data + // TODO: parse offer strings from `client find`, make this smarter - // Get merkle proofs (intermediate nodes) - - // if acceptable, make retrieval deals to get data - // done - - panic("TODO") + order := offers[0].Order() + err = api.ClientRetrieve(ctx, order) + if err == nil { + fmt.Println("Success") + } + return err }, } diff --git a/go.sum b/go.sum index a578cb228..4bc79d4e9 100644 --- a/go.sum +++ b/go.sum @@ -485,6 +485,7 @@ github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1: github.com/prometheus/common v0.2.0 h1:kUZDBDTdBVBYBj5Tmh2NZLlF60mfjA27rM34b+cVwNU= github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= +github.com/prometheus/common v0.6.0 h1:kRhiuYSXR3+uv2IbVbZhUxK5zVD/2pp3Gd2PpvPkpEo= github.com/prometheus/common v0.6.0/go.mod h1:eBmuwkDJBwy6iBfxCBob6t6dR6ENT/y+J+Zk0j9GMYc= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= diff --git a/miner/miner.go b/miner/miner.go index 26e1d4650..e0ff2fd37 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -193,7 +193,7 @@ func (m *Miner) GetBestMiningCandidate() (*MiningBase, error) { } func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*chain.BlockMsg, error) { - log.Info("attempting to mine a block on:", base.ts.Cids()) + log.Debug("attempting to mine a block on:", base.ts.Cids()) ticket, err := m.scratchTicket(ctx, base) if err != nil { return nil, errors.Wrap(err, "scratching ticket failed") diff --git a/node/impl/full/client.go b/node/impl/full/client.go index 4c5a4e89a..f64202965 100644 --- a/node/impl/full/client.go +++ b/node/impl/full/client.go @@ -178,7 +178,7 @@ func (a *ClientAPI) ClientImport(ctx context.Context, path string) (cid.Cid, err NoCopy: true, } - db, err := params.New(chunker.NewSizeSplitter(file, build.UnixfsChunkSize)) + db, err := params.New(chunker.NewSizeSplitter(file, int64(build.UnixfsChunkSize))) if err != nil { return cid.Undef, err } @@ -218,3 +218,7 @@ func (a *ClientAPI) ClientListImports(ctx context.Context) ([]api.Import, error) }) } } + +func (a *ClientAPI) ClientRetrieve(ctx context.Context, order api.RetrievalOrder) error { + return a.Retrieval.RetrieveUnixfs(ctx, order.Root, order.Size, order.MinerPeerID, order.Miner) +} diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index ca4b3f262..8a4bc77bf 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -90,7 +90,8 @@ func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h func HandleRetrieval(host host.Host, lc fx.Lifecycle, m *retrieval.Miner) { lc.Append(fx.Hook{ OnStart: func(context.Context) error { - host.SetStreamHandler(retrieval.QueryProtocolID, m.HandleStream) + host.SetStreamHandler(retrieval.QueryProtocolID, m.HandleQueryStream) + host.SetStreamHandler(retrieval.ProtocolID, m.HandleDealStream) return nil }, }) diff --git a/retrieval/client.go b/retrieval/client.go index b151bd728..143ccb5e3 100644 --- a/retrieval/client.go +++ b/retrieval/client.go @@ -4,16 +4,16 @@ import ( "context" "io/ioutil" - blocks "github.com/ipfs/go-block-format" - "github.com/libp2p/go-libp2p-core/network" - "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-msgio" - "golang.org/x/xerrors" pb "github.com/ipfs/go-bitswap/message/pb" + blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" cbor "github.com/ipfs/go-ipld-cbor" logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-msgio" + "golang.org/x/xerrors" "github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/build" @@ -62,6 +62,7 @@ func (c *Client) Query(ctx context.Context, p discovery.RetrievalPeer, data cid. } return api.QueryOffer{ + Root: data, Size: resp.Size, MinPrice: resp.MinPrice, Miner: p.Address, // TODO: check @@ -102,13 +103,15 @@ func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, stream: s, root: root, - offset: 0, // TODO: check how much data we have locally + offset: 0, // TODO: Check how much data we have locally + // TODO: Support in handler + // TODO: Allow client to specify this windowSize: build.UnixfsChunkSize, verifier: &OptimisticVerifier{}, // TODO: Use a real verifier } - for { + for cst.offset != size { toFetch := cst.windowSize if toFetch+cst.offset > size { toFetch = size - cst.offset @@ -118,7 +121,11 @@ func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, if err != nil { return err } + + cst.offset += toFetch } + log.Info("RETRIEVE SUCCESSFUL") + return nil } func (cst *clientStream) doOneExchange(toFetch uint64) error { @@ -134,18 +141,19 @@ func (cst *clientStream) doOneExchange(toFetch uint64) error { var resp DealResponse if err := cborrpc.ReadCborRPC(cst.stream, &resp); err != nil { + log.Error(err) return err } - if resp.AcceptedResponse == nil { + if resp.Status != Accepted { cst.windowSize = build.UnixfsChunkSize // TODO: apply some 'penalty' to miner 'reputation' (needs to be the same in both cases) - if resp.ErrorResponse != nil { - return xerrors.Errorf("storage deal error: %s", resp.ErrorResponse.Message) + if resp.Status == Error { + return xerrors.Errorf("storage deal error: %s", resp.Message) } - if resp.RejectedResponse != nil { - return xerrors.Errorf("storage deal rejected: %s", resp.RejectedResponse.Message) + if resp.Status == Rejected { + return xerrors.Errorf("storage deal rejected: %s", resp.Message) } return xerrors.New("storage deal response had no Accepted section") } diff --git a/retrieval/miner.go b/retrieval/miner.go index 5e2525d76..d36f52342 100644 --- a/retrieval/miner.go +++ b/retrieval/miner.go @@ -1,24 +1,37 @@ package retrieval import ( - "github.com/libp2p/go-libp2p-core/network" + "context" + "github.com/filecoin-project/go-lotus/build" + "github.com/ipfs/go-cid" + "github.com/libp2p/go-msgio" + "golang.org/x/xerrors" "github.com/filecoin-project/go-lotus/chain/types" "github.com/filecoin-project/go-lotus/lib/cborrpc" "github.com/filecoin-project/go-lotus/storage/sectorblocks" + pb "github.com/ipfs/go-bitswap/message/pb" + "github.com/ipfs/go-blockservice" + "github.com/ipfs/go-merkledag" + unixfile "github.com/ipfs/go-unixfs/file" + "github.com/libp2p/go-libp2p-core/network" ) type Miner struct { sectorBlocks *sectorblocks.SectorBlocks + + pricePerByte types.BigInt + // TODO: Unseal price } func NewMiner(sblks *sectorblocks.SectorBlocks) *Miner { return &Miner{ sectorBlocks: sblks, + pricePerByte: types.NewInt(2), // TODO: allow setting } } -func (m *Miner) HandleStream(stream network.Stream) { +func (m *Miner) HandleQueryStream(stream network.Stream) { defer stream.Close() var query Query @@ -40,7 +53,7 @@ func (m *Miner) HandleStream(stream network.Stream) { answer.Status = Available // TODO: get price, look for already unsealed ref to reduce work - answer.MinPrice = types.NewInt(uint64(refs[0].Size) * 2) // TODO: Get this from somewhere + answer.MinPrice = types.BigMul(types.NewInt(uint64(refs[0].Size)), m.pricePerByte) answer.Size = uint64(refs[0].Size) // TODO: verify on intermediate } @@ -49,3 +62,136 @@ func (m *Miner) HandleStream(stream network.Stream) { return } } + +func writeErr(stream network.Stream, err error) { + log.Errorf("Retrieval deal error: %s", err) + _ = cborrpc.WriteCborRPC(stream, DealResponse{ + Status: Error, + Message: err.Error(), + }) +} + +func (m *Miner) HandleDealStream(stream network.Stream) { // TODO: should we block in stream handlers + defer stream.Close() + + var ufsr sectorblocks.UnixfsReader + var open cid.Cid + var at uint64 + var size uint64 + + for { + var deal Deal + if err := cborrpc.ReadCborRPC(stream, &deal); err != nil { + return + } + + if deal.Unixfs0 == nil { + writeErr(stream, xerrors.New("unknown deal type")) + return + } + + // TODO: Verify payment, check how much we can send based on that + // Or reject (possibly returning the payment to retain reputation with the client) + + bstore := m.sectorBlocks.SealedBlockstore(func() error { + return nil // TODO: approve unsealing based on amount paid + }) + + if open != deal.Unixfs0.Root || at != deal.Unixfs0.Offset { + if deal.Unixfs0.Offset != 0 { + // TODO: Implement SeekBlock (like ReadBlock) in go-unixfs + writeErr(stream, xerrors.New("sending merkle proofs for nonzero offset not supported yet")) + return + } + at = deal.Unixfs0.Offset + + ds := merkledag.NewDAGService(blockservice.New(bstore, nil)) + rootNd, err := ds.Get(context.TODO(), deal.Unixfs0.Root) + if err != nil { + writeErr(stream, err) + return + } + + fsr, err := unixfile.NewUnixfsFile(context.TODO(), ds, rootNd) + if err != nil { + writeErr(stream, err) + return + } + + var ok bool + ufsr, ok = fsr.(sectorblocks.UnixfsReader) + if !ok { + writeErr(stream, xerrors.Errorf("file %s didn't implement sectorblocks.UnixfsReader", deal.Unixfs0.Root)) + return + } + + isize, err := ufsr.Size() + if err != nil { + writeErr(stream, err) + return + } + size = uint64(isize) + } + + if deal.Unixfs0.Offset + deal.Unixfs0.Size > size { + writeErr(stream, xerrors.Errorf("tried to read too much %d+%d > %d", deal.Unixfs0.Offset, deal.Unixfs0.Size, size)) + return + } + + resp := DealResponse{ + Status: Accepted, + } + if err := cborrpc.WriteCborRPC(stream, resp); err != nil { + log.Errorf("Retrieval query: Write Accepted resp: %s", err) + return + } + + buf := make([]byte, network.MessageSizeMax) + msgw := msgio.NewVarintWriter(stream) + + blocksToSend := (deal.Unixfs0.Size + build.UnixfsChunkSize - 1) / build.UnixfsChunkSize + for i := uint64(0); i < blocksToSend; { + data, offset, nd, err := ufsr.ReadBlock(context.TODO()) + if err != nil { + writeErr(stream, err) + return + } + + log.Infof("sending block for a deal: %s", nd.Cid()) + + if offset != deal.Unixfs0.Offset { + writeErr(stream, xerrors.Errorf("ReadBlock on wrong offset: want %d, got %d", deal.Unixfs0.Offset, offset)) + return + } + + /*if uint64(len(data)) != deal.Unixfs0.Size { // TODO: Fix for internal nodes (and any other node too) + writeErr(stream, xerrors.Errorf("ReadBlock data with wrong size: want %d, got %d", deal.Unixfs0.Size, len(data))) + return + }*/ + + block := pb.Message_Block{ + Prefix: nd.Cid().Prefix().Bytes(), + Data: nd.RawData(), + } + + n, err := block.MarshalTo(buf) + if err != nil { + writeErr(stream, err) + return + } + + if err := msgw.WriteMsg(buf[:n]); err != nil { + log.Error(err) + return + } + + if len(data) > 0 { // don't count internal nodes + i++ + } + } + + // TODO: set `at` + + } + +} diff --git a/retrieval/types.go b/retrieval/types.go index 31dc07119..bd1a51810 100644 --- a/retrieval/types.go +++ b/retrieval/types.go @@ -17,11 +17,20 @@ const ( Unavailable ) +const ( + Accepted = iota + Error + Rejected +) + func init() { cbor.RegisterCborType(Deal{}) cbor.RegisterCborType(Query{}) cbor.RegisterCborType(QueryResponse{}) + cbor.RegisterCborType(Unixfs0Offer{}) + + cbor.RegisterCborType(DealResponse{}) } type Query struct { @@ -49,14 +58,7 @@ type Deal struct { Unixfs0 *Unixfs0Offer } -type AcceptedResponse struct{} -type RejectedResponse struct { +type DealResponse struct { + Status int // TODO: make this more spec complainant Message string } -type ErrorResponse RejectedResponse - -type DealResponse struct { - *AcceptedResponse - *RejectedResponse - *ErrorResponse -} diff --git a/storage/sectorblocks/blocks.go b/storage/sectorblocks/blocks.go new file mode 100644 index 000000000..324309aef --- /dev/null +++ b/storage/sectorblocks/blocks.go @@ -0,0 +1,204 @@ +package sectorblocks + +import ( + "context" + "github.com/filecoin-project/go-lotus/api" + "github.com/filecoin-project/go-lotus/lib/sectorbuilder" + "github.com/filecoin-project/go-lotus/node/modules/dtypes" + "github.com/ipfs/go-datastore/namespace" + "github.com/ipfs/go-datastore/query" + ipld "github.com/ipfs/go-ipld-format" + "sync" + + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + dshelp "github.com/ipfs/go-ipfs-ds-help" + files "github.com/ipfs/go-ipfs-files" + cbor "github.com/ipfs/go-ipld-cbor" + + "github.com/filecoin-project/go-lotus/storage/sector" +) + +type SealSerialization uint8 + +const ( + SerializationUnixfs0 SealSerialization = 'u' +) + +var dsPrefix = datastore.NewKey("/sealedblocks") + +type SectorBlocks struct { + *sector.Store + + unsealed *unsealedBlocks + keys datastore.Batching + keyLk sync.Mutex +} + +func NewSectorBlocks(sectst *sector.Store, ds dtypes.MetadataDS, sb *sectorbuilder.SectorBuilder) *SectorBlocks { + sbc := &SectorBlocks{ + Store: sectst, + keys: namespace.Wrap(ds, dsPrefix), + } + + unsealed := &unsealedBlocks{ // TODO: untangle this + sb: sb, + unsealed: map[string][]byte{}, + unsealing: map[string]chan struct{}{}, + } + + sbc.unsealed = unsealed + return sbc +} + +type UnixfsReader interface { + files.File + + // ReadBlock reads data from a single unixfs block. Data is nil + // for intermediate nodes + ReadBlock(context.Context) (data []byte, offset uint64, nd ipld.Node, err error) +} + +type refStorer struct { + blockReader UnixfsReader + writeRef func(cid cid.Cid, offset uint64, size uint32) error + + pieceRef string + remaining []byte +} + +func (st *SectorBlocks) writeRef(cid cid.Cid, offset uint64, size uint32) error { + st.keyLk.Lock() // TODO: make this multithreaded + defer st.keyLk.Unlock() + + v, err := st.keys.Get(dshelp.CidToDsKey(cid)) + if err == datastore.ErrNotFound { + err = nil + } + if err != nil { + return err + } + + var refs []api.SealedRef + if len(v) > 0 { + if err := cbor.DecodeInto(v, &refs); err != nil { + return err + } + } + + refs = append(refs, api.SealedRef{ + Piece: string(SerializationUnixfs0) + cid.String(), + Offset: offset, + Size: size, + }) + + newRef, err := cbor.DumpObject(&refs) + if err != nil { + return err + } + return st.keys.Put(dshelp.CidToDsKey(cid), newRef) // TODO: batch somehow +} + +func (r *refStorer) Read(p []byte) (n int, err error) { + offset := 0 + if len(r.remaining) > 0 { + offset += len(r.remaining) + read := copy(p, r.remaining) + if read == len(r.remaining) { + r.remaining = nil + } else { + r.remaining = r.remaining[read:] + } + return read, nil + } + + for { + data, offset, nd, err := r.blockReader.ReadBlock(context.TODO()) + if err != nil { + return 0, err + } + + if len(data) == 0 { + panic("Handle intermediate nodes") // TODO: ! + } + + if err := r.writeRef(nd.Cid(), offset, uint32(len(data))); err != nil { + return 0, err + } + + read := copy(p, data) + if read < len(data) { + r.remaining = data[read:] + } + // TODO: read multiple + return read, nil + } +} + +func (st *SectorBlocks) AddUnixfsPiece(ref cid.Cid, r UnixfsReader, keepAtLeast uint64) (sectorID uint64, err error) { + size, err := r.Size() + if err != nil { + return 0, err + } + + refst := &refStorer{blockReader: r, pieceRef: string(SerializationUnixfs0) + ref.String(), writeRef: st.writeRef} + + return st.Store.AddPiece(refst.pieceRef, uint64(size), refst) +} + +func (st *SectorBlocks) List() (map[cid.Cid][]api.SealedRef, error) { + res, err := st.keys.Query(query.Query{}) + if err != nil { + return nil, err + } + + ents, err := res.Rest() + if err != nil { + return nil, err + } + + out := map[cid.Cid][]api.SealedRef{} + for _, ent := range ents { + refCid, err := dshelp.DsKeyToCid(datastore.RawKey(ent.Key)) + if err != nil { + return nil, err + } + + var refs []api.SealedRef + if err := cbor.DecodeInto(ent.Value, &refs); err != nil { + return nil, err + } + + out[refCid] = refs + } + + return out, nil +} + +func (st *SectorBlocks) GetRefs(k cid.Cid) ([]api.SealedRef, error) { // TODO: track local sectors + ent, err := st.keys.Get(dshelp.CidToDsKey(k)) + if err != nil { + return nil, err + } + + var refs []api.SealedRef + if err := cbor.DecodeInto(ent, &refs); err != nil { + return nil, err + } + + return refs, nil +} + +func (st *SectorBlocks) Has(k cid.Cid) (bool, error) { + // TODO: ensure sector is still there + return st.keys.Has(dshelp.CidToDsKey(k)) +} + +func (st *SectorBlocks) SealedBlockstore(approveUnseal func() error) *SectorBlockStore { + return &SectorBlockStore{ + //local: nil, // TODO: Pass staging + sectorBlocks: st, + approveUnseal: approveUnseal, + } +} + diff --git a/storage/sectorblocks/blockstore.go b/storage/sectorblocks/blockstore.go index 9cecf9a85..e69c16e62 100644 --- a/storage/sectorblocks/blockstore.go +++ b/storage/sectorblocks/blockstore.go @@ -2,177 +2,77 @@ package sectorblocks import ( "context" - "github.com/filecoin-project/go-lotus/api" - "github.com/filecoin-project/go-lotus/node/modules/dtypes" - "github.com/ipfs/go-datastore/namespace" - "github.com/ipfs/go-datastore/query" - "sync" - + "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" - "github.com/ipfs/go-datastore" - dshelp "github.com/ipfs/go-ipfs-ds-help" - files "github.com/ipfs/go-ipfs-files" - cbor "github.com/ipfs/go-ipld-cbor" - - "github.com/filecoin-project/go-lotus/storage/sector" + blockstore "github.com/ipfs/go-ipfs-blockstore" ) -type SealSerialization uint8 +type SectorBlockStore struct { + // local blockstore.Blockstore // staging before GC // TODO: Pass staging + sectorBlocks *SectorBlocks -const ( - SerializationUnixfs0 SealSerialization = 'u' -) - -var dsPrefix = datastore.NewKey("/sealedblocks") - -type SectorBlocks struct { - *sector.Store - - keys datastore.Batching - keyLk sync.Mutex + approveUnseal func() error } -func NewSectorBlocks(sectst *sector.Store, ds dtypes.MetadataDS) *SectorBlocks { - return &SectorBlocks{ - Store: sectst, - keys: namespace.Wrap(ds, dsPrefix), - } +func (s *SectorBlockStore) DeleteBlock(cid.Cid) error { + panic("not supported") +} +func (s *SectorBlockStore) GetSize(cid.Cid) (int, error) { + panic("not supported") } -type UnixfsReader interface { - files.File - - // ReadBlock reads data from a single unixfs block. Data is nil - // for intermediate nodes - ReadBlock(context.Context) (data []byte, offset uint64, cid cid.Cid, err error) +func (s *SectorBlockStore) Put(blocks.Block) error { + panic("not supported") } -type refStorer struct { - blockReader UnixfsReader - writeRef func(cid cid.Cid, offset uint64, size uint32) error - - pieceRef string - remaining []byte +func (s *SectorBlockStore) PutMany([]blocks.Block) error { + panic("not supported") } -func (st *SectorBlocks) writeRef(cid cid.Cid, offset uint64, size uint32) error { - st.keyLk.Lock() // TODO: make this multithreaded - defer st.keyLk.Unlock() +func (s *SectorBlockStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + panic("not supported") +} - v, err := st.keys.Get(dshelp.CidToDsKey(cid)) - if err == datastore.ErrNotFound { - err = nil - } +func (s *SectorBlockStore) HashOnRead(enabled bool) { + panic("not supported") +} + +func (s *SectorBlockStore) Has(c cid.Cid) (bool, error) { + /*has, err := s.local.Has(c) // TODO: Pass staging if err != nil { - return err + return false, err } + if has { + return true, nil + }*/ - var refs []api.SealedRef - if len(v) > 0 { - if err := cbor.DecodeInto(v, &refs); err != nil { - return err - } + return s.sectorBlocks.Has(c) +} + +func (s *SectorBlockStore) Get(c cid.Cid) (blocks.Block, error) { + /*val, err := s.local.Get(c) // TODO: Pass staging + if err == nil { + return val, nil } + if err != blockstore.ErrNotFound { + return nil, err + }*/ - refs = append(refs, api.SealedRef{ - Piece: string(SerializationUnixfs0) + cid.String(), - Offset: offset, - Size: size, - }) - - newRef, err := cbor.DumpObject(&refs) + refs, err := s.sectorBlocks.GetRefs(c) if err != nil { - return err + return nil, err } - return st.keys.Put(dshelp.CidToDsKey(cid), newRef) // TODO: batch somehow -} - -func (r *refStorer) Read(p []byte) (n int, err error) { - offset := 0 - if len(r.remaining) > 0 { - offset += len(r.remaining) - read := copy(p, r.remaining) - if read == len(r.remaining) { - r.remaining = nil - } else { - r.remaining = r.remaining[read:] - } - return read, nil + if len(refs) == 0 { + return nil, blockstore.ErrNotFound } - for { - data, offset, cid, err := r.blockReader.ReadBlock(context.TODO()) - if err != nil { - return 0, err - } - - if len(data) == 0 { - panic("Handle intermediate nodes") // TODO: ! - } - - if err := r.writeRef(cid, offset, uint32(len(data))); err != nil { - return 0, err - } - - read := copy(p, data) - if read < len(data) { - r.remaining = data[read:] - } - // TODO: read multiple - return read, nil - } -} - -func (st *SectorBlocks) AddUnixfsPiece(ref cid.Cid, r UnixfsReader, keepAtLeast uint64) (sectorID uint64, err error) { - size, err := r.Size() - if err != nil { - return 0, err - } - - refst := &refStorer{blockReader: r, pieceRef: string(SerializationUnixfs0) + ref.String(), writeRef: st.writeRef} - - return st.Store.AddPiece(refst.pieceRef, uint64(size), refst) -} - -func (st *SectorBlocks) List() (map[cid.Cid][]api.SealedRef, error) { - res, err := st.keys.Query(query.Query{}) + data, err := s.sectorBlocks.unsealed.getRef(context.TODO(), refs, s.approveUnseal) if err != nil { return nil, err } - ents, err := res.Rest() - if err != nil { - return nil, err - } - - out := map[cid.Cid][]api.SealedRef{} - for _, ent := range ents { - refCid, err := dshelp.DsKeyToCid(datastore.RawKey(ent.Key)) - if err != nil { - return nil, err - } - - var refs []api.SealedRef - if err := cbor.DecodeInto(ent.Value, &refs); err != nil { - return nil, err - } - - out[refCid] = refs - } - - return out, nil + return blocks.NewBlockWithCid(data, c) } -func (st *SectorBlocks) GetRefs(k cid.Cid) ([]api.SealedRef, error) { // TODO: track unsealed sectors - ent, err := st.keys.Get(dshelp.CidToDsKey(k)) - if err != nil { - return nil, err - } - var refs []api.SealedRef - if err := cbor.DecodeInto(ent, &refs); err != nil { - return nil, err - } - - return refs, nil -} +var _ blockstore.Blockstore = &SectorBlockStore{} diff --git a/storage/sectorblocks/unsealed.go b/storage/sectorblocks/unsealed.go new file mode 100644 index 000000000..d2cbfa820 --- /dev/null +++ b/storage/sectorblocks/unsealed.go @@ -0,0 +1,98 @@ +package sectorblocks + +import ( + "context" + "sync" + + logging "github.com/ipfs/go-log" + + "github.com/filecoin-project/go-lotus/api" + "github.com/filecoin-project/go-lotus/lib/sectorbuilder" +) + +var log = logging.Logger("sectorblocks") + +type unsealedBlocks struct { + lk sync.Mutex + sb *sectorbuilder.SectorBuilder + + // TODO: Treat this as some sort of cache, one with rather aggressive GC + // TODO: This REALLY, REALLY needs to be on-disk + unsealed map[string][]byte + + unsealing map[string]chan struct{} +} + +func (ub *unsealedBlocks) getRef(ctx context.Context, refs []api.SealedRef, approveUnseal func() error) ([]byte, error) { + var best api.SealedRef + + ub.lk.Lock() + for _, ref := range refs { + b, ok := ub.unsealed[ref.Piece] + if ok { + ub.lk.Unlock() + return b[ref.Offset:ref.Offset + uint64(ref.Size)], nil // TODO: check slice math + } + // TODO: pick unsealing based on how long it's running (or just select all relevant, usually it'll be just one) + _, ok = ub.unsealing[ref.Piece] + if ok { + best = ref + break + } + best = ref + } + ub.lk.Unlock() + + b, err := ub.maybeUnseal(ctx, best.Piece, approveUnseal) + if err != nil { + return nil, err + } + + return b[best.Offset:best.Offset + uint64(best.Size)], nil // TODO: check slice math +} + +func (ub *unsealedBlocks) maybeUnseal(ctx context.Context, pieceKey string, approveUnseal func() error) ([]byte, error) { + ub.lk.Lock() + defer ub.lk.Unlock() + + out, ok := ub.unsealed[pieceKey] + if ok { + return out, nil + } + + wait, ok := ub.unsealing[pieceKey] + if ok { + ub.lk.Unlock() + select { + case <-wait: + ub.lk.Lock() + // TODO: make sure this is not racy with gc when it's implemented + return ub.unsealed[pieceKey], nil + case <-ctx.Done(): + ub.lk.Lock() + return nil, ctx.Err() + } + } + + // TODO: doing this under a lock is suboptimal.. but simpler + err := approveUnseal() + if err != nil { + return nil, err + } + + ub.unsealing[pieceKey] = make(chan struct{}) + ub.lk.Unlock() + + log.Infof("Unsealing piece '%s'", pieceKey) + data, err := ub.sb.ReadPieceFromSealedSector(pieceKey) + if err != nil { + return nil, err + } + + ub.lk.Lock() + ub.unsealed[pieceKey] = data + close(ub.unsealing[pieceKey]) + return data, nil +} + +