Retrieval works!

This commit is contained in:
Łukasz Magiera 2019-08-27 20:45:21 +02:00
parent 60eedb699e
commit be30bc79a5
15 changed files with 576 additions and 185 deletions

View File

@ -93,6 +93,7 @@ type FullNode interface {
ClientStartDeal(ctx context.Context, data cid.Cid, miner address.Address, price types.BigInt, blocksDuration uint64) (*cid.Cid, error) ClientStartDeal(ctx context.Context, data cid.Cid, miner address.Address, price types.BigInt, blocksDuration uint64) (*cid.Cid, error)
ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error)
ClientFindData(ctx context.Context, root cid.Cid) ([]QueryOffer, error) // TODO: specify serialization mode we want (defaults to unixfs for now) ClientFindData(ctx context.Context, root cid.Cid) ([]QueryOffer, error) // TODO: specify serialization mode we want (defaults to unixfs for now)
ClientRetrieve(ctx context.Context, order RetrievalOrder) error // TODO: maybe just allow putting this straight into some file
// ClientUnimport removes references to the specified file from filestore // ClientUnimport removes references to the specified file from filestore
//ClientUnimport(path string) //ClientUnimport(path string)
@ -196,9 +197,30 @@ type SealedRef struct {
type QueryOffer struct { type QueryOffer struct {
Err string Err string
Root cid.Cid
Size uint64 Size uint64
MinPrice types.BigInt MinPrice types.BigInt
Miner address.Address Miner address.Address
MinerPeerID peer.ID MinerPeerID peer.ID
} }
func (o *QueryOffer) Order() RetrievalOrder {
return RetrievalOrder{
Root: o.Root,
Size: o.Size,
Miner: o.Miner,
MinerPeerID: o.MinerPeerID,
}
}
type RetrievalOrder struct {
// TODO: make this loss unixfs specific
Root cid.Cid
Size uint64
// TODO: support offset
Miner address.Address
MinerPeerID peer.ID
}

View File

@ -73,6 +73,7 @@ type FullNodeStruct struct {
ClientHasLocal func(ctx context.Context, root cid.Cid) (bool, error) `perm:"write"` ClientHasLocal func(ctx context.Context, root cid.Cid) (bool, error) `perm:"write"`
ClientFindData func(ctx context.Context, root cid.Cid) ([]QueryOffer, error) `perm:"read"` ClientFindData func(ctx context.Context, root cid.Cid) ([]QueryOffer, error) `perm:"read"`
ClientStartDeal func(ctx context.Context, data cid.Cid, miner address.Address, price types.BigInt, blocksDuration uint64) (*cid.Cid, error) `perm:"admin"` ClientStartDeal func(ctx context.Context, data cid.Cid, miner address.Address, price types.BigInt, blocksDuration uint64) (*cid.Cid, error) `perm:"admin"`
ClientRetrieve func(ctx context.Context, order RetrievalOrder) error `perm:"admin"`
StateMinerSectors func(context.Context, address.Address) ([]*SectorInfo, error) `perm:"read"` StateMinerSectors func(context.Context, address.Address) ([]*SectorInfo, error) `perm:"read"`
StateMinerProvingSet func(context.Context, address.Address) ([]*SectorInfo, error) `perm:"read"` StateMinerProvingSet func(context.Context, address.Address) ([]*SectorInfo, error) `perm:"read"`
@ -166,6 +167,10 @@ func (c *FullNodeStruct) ClientStartDeal(ctx context.Context, data cid.Cid, mine
return c.Internal.ClientStartDeal(ctx, data, miner, price, blocksDuration) return c.Internal.ClientStartDeal(ctx, data, miner, price, blocksDuration)
} }
func (c *FullNodeStruct) ClientRetrieve(ctx context.Context, order RetrievalOrder) error {
return c.Internal.ClientRetrieve(ctx, order)
}
func (c *FullNodeStruct) MpoolPending(ctx context.Context, ts *types.TipSet) ([]*types.SignedMessage, error) { func (c *FullNodeStruct) MpoolPending(ctx context.Context, ts *types.TipSet) ([]*types.SignedMessage, error) {
return c.Internal.MpoolPending(ctx, ts) return c.Internal.MpoolPending(ctx, ts)
} }

View File

@ -28,7 +28,7 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
} }
go func() { go func() {
log.Info("about to fetch messages for block from pubsub") log.Debug("about to fetch messages for block from pubsub")
bmsgs, err := s.Bsync.FetchMessagesByCids(context.TODO(), blk.BlsMessages) bmsgs, err := s.Bsync.FetchMessagesByCids(context.TODO(), blk.BlsMessages)
if err != nil { if err != nil {
log.Errorf("failed to fetch all bls messages for block received over pubusb: %s", err) log.Errorf("failed to fetch all bls messages for block received over pubusb: %s", err)
@ -41,7 +41,7 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
return return
} }
log.Info("inform new block over pubsub") log.Debug("inform new block over pubsub")
s.InformNewBlock(msg.GetFrom(), &types.FullBlock{ s.InformNewBlock(msg.GetFrom(), &types.FullBlock{
Header: blk.Header, Header: blk.Header,
BlsMessages: bmsgs, BlsMessages: bmsgs,

View File

@ -88,7 +88,7 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) {
} }
if from == syncer.self { if from == syncer.self {
// TODO: this is kindof a hack... // TODO: this is kindof a hack...
log.Infof("got block from ourselves") log.Debug("got block from ourselves")
if err := syncer.Sync(fts); err != nil { if err := syncer.Sync(fts); err != nil {
log.Errorf("failed to sync our own block: %s", err) log.Errorf("failed to sync our own block: %s", err)

View File

@ -181,7 +181,7 @@ var clientRetrieveCmd = &cli.Command{
// Check if we already have this data locally // Check if we already have this data locally
has, err := api.ClientHasLocal(ctx, file) /*has, err := api.ClientHasLocal(ctx, file)
if err != nil { if err != nil {
return err return err
} }
@ -189,20 +189,20 @@ var clientRetrieveCmd = &cli.Command{
if has { if has {
fmt.Println("Success: Already in local storage") fmt.Println("Success: Already in local storage")
return nil return nil
} }*/ // TODO: uncomment before merge
_, err = api.ClientFindData(ctx, file) offers, err := api.ClientFindData(ctx, file)
if err != nil { if err != nil {
return err return err
} }
// Find miner which may have this data // TODO: parse offer strings from `client find`, make this smarter
// Get merkle proofs (intermediate nodes) order := offers[0].Order()
err = api.ClientRetrieve(ctx, order)
// if acceptable, make retrieval deals to get data if err == nil {
// done fmt.Println("Success")
}
panic("TODO") return err
}, },
} }

1
go.sum
View File

@ -485,6 +485,7 @@ github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:
github.com/prometheus/common v0.2.0 h1:kUZDBDTdBVBYBj5Tmh2NZLlF60mfjA27rM34b+cVwNU= github.com/prometheus/common v0.2.0 h1:kUZDBDTdBVBYBj5Tmh2NZLlF60mfjA27rM34b+cVwNU=
github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.6.0 h1:kRhiuYSXR3+uv2IbVbZhUxK5zVD/2pp3Gd2PpvPkpEo=
github.com/prometheus/common v0.6.0/go.mod h1:eBmuwkDJBwy6iBfxCBob6t6dR6ENT/y+J+Zk0j9GMYc= github.com/prometheus/common v0.6.0/go.mod h1:eBmuwkDJBwy6iBfxCBob6t6dR6ENT/y+J+Zk0j9GMYc=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=

View File

@ -193,7 +193,7 @@ func (m *Miner) GetBestMiningCandidate() (*MiningBase, error) {
} }
func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*chain.BlockMsg, error) { func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*chain.BlockMsg, error) {
log.Info("attempting to mine a block on:", base.ts.Cids()) log.Debug("attempting to mine a block on:", base.ts.Cids())
ticket, err := m.scratchTicket(ctx, base) ticket, err := m.scratchTicket(ctx, base)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "scratching ticket failed") return nil, errors.Wrap(err, "scratching ticket failed")

View File

@ -178,7 +178,7 @@ func (a *ClientAPI) ClientImport(ctx context.Context, path string) (cid.Cid, err
NoCopy: true, NoCopy: true,
} }
db, err := params.New(chunker.NewSizeSplitter(file, build.UnixfsChunkSize)) db, err := params.New(chunker.NewSizeSplitter(file, int64(build.UnixfsChunkSize)))
if err != nil { if err != nil {
return cid.Undef, err return cid.Undef, err
} }
@ -218,3 +218,7 @@ func (a *ClientAPI) ClientListImports(ctx context.Context) ([]api.Import, error)
}) })
} }
} }
func (a *ClientAPI) ClientRetrieve(ctx context.Context, order api.RetrievalOrder) error {
return a.Retrieval.RetrieveUnixfs(ctx, order.Root, order.Size, order.MinerPeerID, order.Miner)
}

View File

@ -90,7 +90,8 @@ func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h
func HandleRetrieval(host host.Host, lc fx.Lifecycle, m *retrieval.Miner) { func HandleRetrieval(host host.Host, lc fx.Lifecycle, m *retrieval.Miner) {
lc.Append(fx.Hook{ lc.Append(fx.Hook{
OnStart: func(context.Context) error { OnStart: func(context.Context) error {
host.SetStreamHandler(retrieval.QueryProtocolID, m.HandleStream) host.SetStreamHandler(retrieval.QueryProtocolID, m.HandleQueryStream)
host.SetStreamHandler(retrieval.ProtocolID, m.HandleDealStream)
return nil return nil
}, },
}) })

View File

@ -4,16 +4,16 @@ import (
"context" "context"
"io/ioutil" "io/ioutil"
blocks "github.com/ipfs/go-block-format"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-msgio"
"golang.org/x/xerrors"
pb "github.com/ipfs/go-bitswap/message/pb" pb "github.com/ipfs/go-bitswap/message/pb"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor" cbor "github.com/ipfs/go-ipld-cbor"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-msgio"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/build" "github.com/filecoin-project/go-lotus/build"
@ -62,6 +62,7 @@ func (c *Client) Query(ctx context.Context, p discovery.RetrievalPeer, data cid.
} }
return api.QueryOffer{ return api.QueryOffer{
Root: data,
Size: resp.Size, Size: resp.Size,
MinPrice: resp.MinPrice, MinPrice: resp.MinPrice,
Miner: p.Address, // TODO: check Miner: p.Address, // TODO: check
@ -102,13 +103,15 @@ func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64,
stream: s, stream: s,
root: root, root: root,
offset: 0, // TODO: check how much data we have locally offset: 0, // TODO: Check how much data we have locally
// TODO: Support in handler
// TODO: Allow client to specify this
windowSize: build.UnixfsChunkSize, windowSize: build.UnixfsChunkSize,
verifier: &OptimisticVerifier{}, // TODO: Use a real verifier verifier: &OptimisticVerifier{}, // TODO: Use a real verifier
} }
for { for cst.offset != size {
toFetch := cst.windowSize toFetch := cst.windowSize
if toFetch+cst.offset > size { if toFetch+cst.offset > size {
toFetch = size - cst.offset toFetch = size - cst.offset
@ -118,7 +121,11 @@ func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64,
if err != nil { if err != nil {
return err return err
} }
cst.offset += toFetch
} }
log.Info("RETRIEVE SUCCESSFUL")
return nil
} }
func (cst *clientStream) doOneExchange(toFetch uint64) error { func (cst *clientStream) doOneExchange(toFetch uint64) error {
@ -134,18 +141,19 @@ func (cst *clientStream) doOneExchange(toFetch uint64) error {
var resp DealResponse var resp DealResponse
if err := cborrpc.ReadCborRPC(cst.stream, &resp); err != nil { if err := cborrpc.ReadCborRPC(cst.stream, &resp); err != nil {
log.Error(err)
return err return err
} }
if resp.AcceptedResponse == nil { if resp.Status != Accepted {
cst.windowSize = build.UnixfsChunkSize cst.windowSize = build.UnixfsChunkSize
// TODO: apply some 'penalty' to miner 'reputation' (needs to be the same in both cases) // TODO: apply some 'penalty' to miner 'reputation' (needs to be the same in both cases)
if resp.ErrorResponse != nil { if resp.Status == Error {
return xerrors.Errorf("storage deal error: %s", resp.ErrorResponse.Message) return xerrors.Errorf("storage deal error: %s", resp.Message)
} }
if resp.RejectedResponse != nil { if resp.Status == Rejected {
return xerrors.Errorf("storage deal rejected: %s", resp.RejectedResponse.Message) return xerrors.Errorf("storage deal rejected: %s", resp.Message)
} }
return xerrors.New("storage deal response had no Accepted section") return xerrors.New("storage deal response had no Accepted section")
} }

View File

@ -1,24 +1,37 @@
package retrieval package retrieval
import ( import (
"github.com/libp2p/go-libp2p-core/network" "context"
"github.com/filecoin-project/go-lotus/build"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-msgio"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-lotus/chain/types" "github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/lib/cborrpc" "github.com/filecoin-project/go-lotus/lib/cborrpc"
"github.com/filecoin-project/go-lotus/storage/sectorblocks" "github.com/filecoin-project/go-lotus/storage/sectorblocks"
pb "github.com/ipfs/go-bitswap/message/pb"
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-merkledag"
unixfile "github.com/ipfs/go-unixfs/file"
"github.com/libp2p/go-libp2p-core/network"
) )
type Miner struct { type Miner struct {
sectorBlocks *sectorblocks.SectorBlocks sectorBlocks *sectorblocks.SectorBlocks
pricePerByte types.BigInt
// TODO: Unseal price
} }
func NewMiner(sblks *sectorblocks.SectorBlocks) *Miner { func NewMiner(sblks *sectorblocks.SectorBlocks) *Miner {
return &Miner{ return &Miner{
sectorBlocks: sblks, sectorBlocks: sblks,
pricePerByte: types.NewInt(2), // TODO: allow setting
} }
} }
func (m *Miner) HandleStream(stream network.Stream) { func (m *Miner) HandleQueryStream(stream network.Stream) {
defer stream.Close() defer stream.Close()
var query Query var query Query
@ -40,7 +53,7 @@ func (m *Miner) HandleStream(stream network.Stream) {
answer.Status = Available answer.Status = Available
// TODO: get price, look for already unsealed ref to reduce work // TODO: get price, look for already unsealed ref to reduce work
answer.MinPrice = types.NewInt(uint64(refs[0].Size) * 2) // TODO: Get this from somewhere answer.MinPrice = types.BigMul(types.NewInt(uint64(refs[0].Size)), m.pricePerByte)
answer.Size = uint64(refs[0].Size) // TODO: verify on intermediate answer.Size = uint64(refs[0].Size) // TODO: verify on intermediate
} }
@ -49,3 +62,136 @@ func (m *Miner) HandleStream(stream network.Stream) {
return return
} }
} }
func writeErr(stream network.Stream, err error) {
log.Errorf("Retrieval deal error: %s", err)
_ = cborrpc.WriteCborRPC(stream, DealResponse{
Status: Error,
Message: err.Error(),
})
}
func (m *Miner) HandleDealStream(stream network.Stream) { // TODO: should we block in stream handlers
defer stream.Close()
var ufsr sectorblocks.UnixfsReader
var open cid.Cid
var at uint64
var size uint64
for {
var deal Deal
if err := cborrpc.ReadCborRPC(stream, &deal); err != nil {
return
}
if deal.Unixfs0 == nil {
writeErr(stream, xerrors.New("unknown deal type"))
return
}
// TODO: Verify payment, check how much we can send based on that
// Or reject (possibly returning the payment to retain reputation with the client)
bstore := m.sectorBlocks.SealedBlockstore(func() error {
return nil // TODO: approve unsealing based on amount paid
})
if open != deal.Unixfs0.Root || at != deal.Unixfs0.Offset {
if deal.Unixfs0.Offset != 0 {
// TODO: Implement SeekBlock (like ReadBlock) in go-unixfs
writeErr(stream, xerrors.New("sending merkle proofs for nonzero offset not supported yet"))
return
}
at = deal.Unixfs0.Offset
ds := merkledag.NewDAGService(blockservice.New(bstore, nil))
rootNd, err := ds.Get(context.TODO(), deal.Unixfs0.Root)
if err != nil {
writeErr(stream, err)
return
}
fsr, err := unixfile.NewUnixfsFile(context.TODO(), ds, rootNd)
if err != nil {
writeErr(stream, err)
return
}
var ok bool
ufsr, ok = fsr.(sectorblocks.UnixfsReader)
if !ok {
writeErr(stream, xerrors.Errorf("file %s didn't implement sectorblocks.UnixfsReader", deal.Unixfs0.Root))
return
}
isize, err := ufsr.Size()
if err != nil {
writeErr(stream, err)
return
}
size = uint64(isize)
}
if deal.Unixfs0.Offset + deal.Unixfs0.Size > size {
writeErr(stream, xerrors.Errorf("tried to read too much %d+%d > %d", deal.Unixfs0.Offset, deal.Unixfs0.Size, size))
return
}
resp := DealResponse{
Status: Accepted,
}
if err := cborrpc.WriteCborRPC(stream, resp); err != nil {
log.Errorf("Retrieval query: Write Accepted resp: %s", err)
return
}
buf := make([]byte, network.MessageSizeMax)
msgw := msgio.NewVarintWriter(stream)
blocksToSend := (deal.Unixfs0.Size + build.UnixfsChunkSize - 1) / build.UnixfsChunkSize
for i := uint64(0); i < blocksToSend; {
data, offset, nd, err := ufsr.ReadBlock(context.TODO())
if err != nil {
writeErr(stream, err)
return
}
log.Infof("sending block for a deal: %s", nd.Cid())
if offset != deal.Unixfs0.Offset {
writeErr(stream, xerrors.Errorf("ReadBlock on wrong offset: want %d, got %d", deal.Unixfs0.Offset, offset))
return
}
/*if uint64(len(data)) != deal.Unixfs0.Size { // TODO: Fix for internal nodes (and any other node too)
writeErr(stream, xerrors.Errorf("ReadBlock data with wrong size: want %d, got %d", deal.Unixfs0.Size, len(data)))
return
}*/
block := pb.Message_Block{
Prefix: nd.Cid().Prefix().Bytes(),
Data: nd.RawData(),
}
n, err := block.MarshalTo(buf)
if err != nil {
writeErr(stream, err)
return
}
if err := msgw.WriteMsg(buf[:n]); err != nil {
log.Error(err)
return
}
if len(data) > 0 { // don't count internal nodes
i++
}
}
// TODO: set `at`
}
}

View File

@ -17,11 +17,20 @@ const (
Unavailable Unavailable
) )
const (
Accepted = iota
Error
Rejected
)
func init() { func init() {
cbor.RegisterCborType(Deal{}) cbor.RegisterCborType(Deal{})
cbor.RegisterCborType(Query{}) cbor.RegisterCborType(Query{})
cbor.RegisterCborType(QueryResponse{}) cbor.RegisterCborType(QueryResponse{})
cbor.RegisterCborType(Unixfs0Offer{})
cbor.RegisterCborType(DealResponse{})
} }
type Query struct { type Query struct {
@ -49,14 +58,7 @@ type Deal struct {
Unixfs0 *Unixfs0Offer Unixfs0 *Unixfs0Offer
} }
type AcceptedResponse struct{} type DealResponse struct {
type RejectedResponse struct { Status int // TODO: make this more spec complainant
Message string Message string
} }
type ErrorResponse RejectedResponse
type DealResponse struct {
*AcceptedResponse
*RejectedResponse
*ErrorResponse
}

View File

@ -0,0 +1,204 @@
package sectorblocks
import (
"context"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/lib/sectorbuilder"
"github.com/filecoin-project/go-lotus/node/modules/dtypes"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-datastore/query"
ipld "github.com/ipfs/go-ipld-format"
"sync"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
dshelp "github.com/ipfs/go-ipfs-ds-help"
files "github.com/ipfs/go-ipfs-files"
cbor "github.com/ipfs/go-ipld-cbor"
"github.com/filecoin-project/go-lotus/storage/sector"
)
type SealSerialization uint8
const (
SerializationUnixfs0 SealSerialization = 'u'
)
var dsPrefix = datastore.NewKey("/sealedblocks")
type SectorBlocks struct {
*sector.Store
unsealed *unsealedBlocks
keys datastore.Batching
keyLk sync.Mutex
}
func NewSectorBlocks(sectst *sector.Store, ds dtypes.MetadataDS, sb *sectorbuilder.SectorBuilder) *SectorBlocks {
sbc := &SectorBlocks{
Store: sectst,
keys: namespace.Wrap(ds, dsPrefix),
}
unsealed := &unsealedBlocks{ // TODO: untangle this
sb: sb,
unsealed: map[string][]byte{},
unsealing: map[string]chan struct{}{},
}
sbc.unsealed = unsealed
return sbc
}
type UnixfsReader interface {
files.File
// ReadBlock reads data from a single unixfs block. Data is nil
// for intermediate nodes
ReadBlock(context.Context) (data []byte, offset uint64, nd ipld.Node, err error)
}
type refStorer struct {
blockReader UnixfsReader
writeRef func(cid cid.Cid, offset uint64, size uint32) error
pieceRef string
remaining []byte
}
func (st *SectorBlocks) writeRef(cid cid.Cid, offset uint64, size uint32) error {
st.keyLk.Lock() // TODO: make this multithreaded
defer st.keyLk.Unlock()
v, err := st.keys.Get(dshelp.CidToDsKey(cid))
if err == datastore.ErrNotFound {
err = nil
}
if err != nil {
return err
}
var refs []api.SealedRef
if len(v) > 0 {
if err := cbor.DecodeInto(v, &refs); err != nil {
return err
}
}
refs = append(refs, api.SealedRef{
Piece: string(SerializationUnixfs0) + cid.String(),
Offset: offset,
Size: size,
})
newRef, err := cbor.DumpObject(&refs)
if err != nil {
return err
}
return st.keys.Put(dshelp.CidToDsKey(cid), newRef) // TODO: batch somehow
}
func (r *refStorer) Read(p []byte) (n int, err error) {
offset := 0
if len(r.remaining) > 0 {
offset += len(r.remaining)
read := copy(p, r.remaining)
if read == len(r.remaining) {
r.remaining = nil
} else {
r.remaining = r.remaining[read:]
}
return read, nil
}
for {
data, offset, nd, err := r.blockReader.ReadBlock(context.TODO())
if err != nil {
return 0, err
}
if len(data) == 0 {
panic("Handle intermediate nodes") // TODO: !
}
if err := r.writeRef(nd.Cid(), offset, uint32(len(data))); err != nil {
return 0, err
}
read := copy(p, data)
if read < len(data) {
r.remaining = data[read:]
}
// TODO: read multiple
return read, nil
}
}
func (st *SectorBlocks) AddUnixfsPiece(ref cid.Cid, r UnixfsReader, keepAtLeast uint64) (sectorID uint64, err error) {
size, err := r.Size()
if err != nil {
return 0, err
}
refst := &refStorer{blockReader: r, pieceRef: string(SerializationUnixfs0) + ref.String(), writeRef: st.writeRef}
return st.Store.AddPiece(refst.pieceRef, uint64(size), refst)
}
func (st *SectorBlocks) List() (map[cid.Cid][]api.SealedRef, error) {
res, err := st.keys.Query(query.Query{})
if err != nil {
return nil, err
}
ents, err := res.Rest()
if err != nil {
return nil, err
}
out := map[cid.Cid][]api.SealedRef{}
for _, ent := range ents {
refCid, err := dshelp.DsKeyToCid(datastore.RawKey(ent.Key))
if err != nil {
return nil, err
}
var refs []api.SealedRef
if err := cbor.DecodeInto(ent.Value, &refs); err != nil {
return nil, err
}
out[refCid] = refs
}
return out, nil
}
func (st *SectorBlocks) GetRefs(k cid.Cid) ([]api.SealedRef, error) { // TODO: track local sectors
ent, err := st.keys.Get(dshelp.CidToDsKey(k))
if err != nil {
return nil, err
}
var refs []api.SealedRef
if err := cbor.DecodeInto(ent, &refs); err != nil {
return nil, err
}
return refs, nil
}
func (st *SectorBlocks) Has(k cid.Cid) (bool, error) {
// TODO: ensure sector is still there
return st.keys.Has(dshelp.CidToDsKey(k))
}
func (st *SectorBlocks) SealedBlockstore(approveUnseal func() error) *SectorBlockStore {
return &SectorBlockStore{
//local: nil, // TODO: Pass staging
sectorBlocks: st,
approveUnseal: approveUnseal,
}
}

View File

@ -2,177 +2,77 @@ package sectorblocks
import ( import (
"context" "context"
"github.com/filecoin-project/go-lotus/api" "github.com/ipfs/go-block-format"
"github.com/filecoin-project/go-lotus/node/modules/dtypes"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-datastore/query"
"sync"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore" blockstore "github.com/ipfs/go-ipfs-blockstore"
dshelp "github.com/ipfs/go-ipfs-ds-help"
files "github.com/ipfs/go-ipfs-files"
cbor "github.com/ipfs/go-ipld-cbor"
"github.com/filecoin-project/go-lotus/storage/sector"
) )
type SealSerialization uint8 type SectorBlockStore struct {
// local blockstore.Blockstore // staging before GC // TODO: Pass staging
sectorBlocks *SectorBlocks
const ( approveUnseal func() error
SerializationUnixfs0 SealSerialization = 'u'
)
var dsPrefix = datastore.NewKey("/sealedblocks")
type SectorBlocks struct {
*sector.Store
keys datastore.Batching
keyLk sync.Mutex
} }
func NewSectorBlocks(sectst *sector.Store, ds dtypes.MetadataDS) *SectorBlocks { func (s *SectorBlockStore) DeleteBlock(cid.Cid) error {
return &SectorBlocks{ panic("not supported")
Store: sectst, }
keys: namespace.Wrap(ds, dsPrefix), func (s *SectorBlockStore) GetSize(cid.Cid) (int, error) {
} panic("not supported")
} }
type UnixfsReader interface { func (s *SectorBlockStore) Put(blocks.Block) error {
files.File panic("not supported")
// ReadBlock reads data from a single unixfs block. Data is nil
// for intermediate nodes
ReadBlock(context.Context) (data []byte, offset uint64, cid cid.Cid, err error)
} }
type refStorer struct { func (s *SectorBlockStore) PutMany([]blocks.Block) error {
blockReader UnixfsReader panic("not supported")
writeRef func(cid cid.Cid, offset uint64, size uint32) error
pieceRef string
remaining []byte
} }
func (st *SectorBlocks) writeRef(cid cid.Cid, offset uint64, size uint32) error { func (s *SectorBlockStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
st.keyLk.Lock() // TODO: make this multithreaded panic("not supported")
defer st.keyLk.Unlock() }
v, err := st.keys.Get(dshelp.CidToDsKey(cid)) func (s *SectorBlockStore) HashOnRead(enabled bool) {
if err == datastore.ErrNotFound { panic("not supported")
err = nil }
}
func (s *SectorBlockStore) Has(c cid.Cid) (bool, error) {
/*has, err := s.local.Has(c) // TODO: Pass staging
if err != nil { if err != nil {
return err return false, err
} }
if has {
return true, nil
}*/
var refs []api.SealedRef return s.sectorBlocks.Has(c)
if len(v) > 0 {
if err := cbor.DecodeInto(v, &refs); err != nil {
return err
}
}
refs = append(refs, api.SealedRef{
Piece: string(SerializationUnixfs0) + cid.String(),
Offset: offset,
Size: size,
})
newRef, err := cbor.DumpObject(&refs)
if err != nil {
return err
}
return st.keys.Put(dshelp.CidToDsKey(cid), newRef) // TODO: batch somehow
} }
func (r *refStorer) Read(p []byte) (n int, err error) { func (s *SectorBlockStore) Get(c cid.Cid) (blocks.Block, error) {
offset := 0 /*val, err := s.local.Get(c) // TODO: Pass staging
if len(r.remaining) > 0 { if err == nil {
offset += len(r.remaining) return val, nil
read := copy(p, r.remaining)
if read == len(r.remaining) {
r.remaining = nil
} else {
r.remaining = r.remaining[read:]
}
return read, nil
} }
if err != blockstore.ErrNotFound {
return nil, err
}*/
for { refs, err := s.sectorBlocks.GetRefs(c)
data, offset, cid, err := r.blockReader.ReadBlock(context.TODO())
if err != nil { if err != nil {
return 0, err return nil, err
}
if len(refs) == 0 {
return nil, blockstore.ErrNotFound
} }
if len(data) == 0 { data, err := s.sectorBlocks.unsealed.getRef(context.TODO(), refs, s.approveUnseal)
panic("Handle intermediate nodes") // TODO: !
}
if err := r.writeRef(cid, offset, uint32(len(data))); err != nil {
return 0, err
}
read := copy(p, data)
if read < len(data) {
r.remaining = data[read:]
}
// TODO: read multiple
return read, nil
}
}
func (st *SectorBlocks) AddUnixfsPiece(ref cid.Cid, r UnixfsReader, keepAtLeast uint64) (sectorID uint64, err error) {
size, err := r.Size()
if err != nil {
return 0, err
}
refst := &refStorer{blockReader: r, pieceRef: string(SerializationUnixfs0) + ref.String(), writeRef: st.writeRef}
return st.Store.AddPiece(refst.pieceRef, uint64(size), refst)
}
func (st *SectorBlocks) List() (map[cid.Cid][]api.SealedRef, error) {
res, err := st.keys.Query(query.Query{})
if err != nil { if err != nil {
return nil, err return nil, err
} }
ents, err := res.Rest() return blocks.NewBlockWithCid(data, c)
if err != nil {
return nil, err
}
out := map[cid.Cid][]api.SealedRef{}
for _, ent := range ents {
refCid, err := dshelp.DsKeyToCid(datastore.RawKey(ent.Key))
if err != nil {
return nil, err
}
var refs []api.SealedRef
if err := cbor.DecodeInto(ent.Value, &refs); err != nil {
return nil, err
}
out[refCid] = refs
}
return out, nil
} }
func (st *SectorBlocks) GetRefs(k cid.Cid) ([]api.SealedRef, error) { // TODO: track unsealed sectors
ent, err := st.keys.Get(dshelp.CidToDsKey(k))
if err != nil {
return nil, err
}
var refs []api.SealedRef var _ blockstore.Blockstore = &SectorBlockStore{}
if err := cbor.DecodeInto(ent, &refs); err != nil {
return nil, err
}
return refs, nil
}

View File

@ -0,0 +1,98 @@
package sectorblocks
import (
"context"
"sync"
logging "github.com/ipfs/go-log"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/lib/sectorbuilder"
)
var log = logging.Logger("sectorblocks")
type unsealedBlocks struct {
lk sync.Mutex
sb *sectorbuilder.SectorBuilder
// TODO: Treat this as some sort of cache, one with rather aggressive GC
// TODO: This REALLY, REALLY needs to be on-disk
unsealed map[string][]byte
unsealing map[string]chan struct{}
}
func (ub *unsealedBlocks) getRef(ctx context.Context, refs []api.SealedRef, approveUnseal func() error) ([]byte, error) {
var best api.SealedRef
ub.lk.Lock()
for _, ref := range refs {
b, ok := ub.unsealed[ref.Piece]
if ok {
ub.lk.Unlock()
return b[ref.Offset:ref.Offset + uint64(ref.Size)], nil // TODO: check slice math
}
// TODO: pick unsealing based on how long it's running (or just select all relevant, usually it'll be just one)
_, ok = ub.unsealing[ref.Piece]
if ok {
best = ref
break
}
best = ref
}
ub.lk.Unlock()
b, err := ub.maybeUnseal(ctx, best.Piece, approveUnseal)
if err != nil {
return nil, err
}
return b[best.Offset:best.Offset + uint64(best.Size)], nil // TODO: check slice math
}
func (ub *unsealedBlocks) maybeUnseal(ctx context.Context, pieceKey string, approveUnseal func() error) ([]byte, error) {
ub.lk.Lock()
defer ub.lk.Unlock()
out, ok := ub.unsealed[pieceKey]
if ok {
return out, nil
}
wait, ok := ub.unsealing[pieceKey]
if ok {
ub.lk.Unlock()
select {
case <-wait:
ub.lk.Lock()
// TODO: make sure this is not racy with gc when it's implemented
return ub.unsealed[pieceKey], nil
case <-ctx.Done():
ub.lk.Lock()
return nil, ctx.Err()
}
}
// TODO: doing this under a lock is suboptimal.. but simpler
err := approveUnseal()
if err != nil {
return nil, err
}
ub.unsealing[pieceKey] = make(chan struct{})
ub.lk.Unlock()
log.Infof("Unsealing piece '%s'", pieceKey)
data, err := ub.sb.ReadPieceFromSealedSector(pieceKey)
if err != nil {
return nil, err
}
ub.lk.Lock()
ub.unsealed[pieceKey] = data
close(ub.unsealing[pieceKey])
return data, nil
}