From cad3efb9babb37c51647a3f9b892478cc93b1c02 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 26 Aug 2019 12:04:57 +0200 Subject: [PATCH] Command to list sealed blocks --- api/api.go | 18 ++++- api/struct.go | 6 ++ chain/deals/handler.go | 6 +- chain/deals/handler_states.go | 6 +- cmd/lotus-storage-miner/sectors.go | 26 +++++++ lib/jsonrpc/client.go | 1 + node/builder.go | 4 +- node/impl/storminer.go | 18 +++++ .../blockstore.go} | 73 +++++++++++++------ 9 files changed, 125 insertions(+), 33 deletions(-) rename storage/{sealedbstore/sealedbstore.go => sectorblocks/blockstore.go} (61%) diff --git a/api/api.go b/api/api.go index 8850a9d03..39ef937f8 100644 --- a/api/api.go +++ b/api/api.go @@ -3,6 +3,9 @@ package api import ( "context" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-filestore" + cbor "github.com/ipfs/go-ipld-cbor" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" @@ -10,12 +13,13 @@ import ( "github.com/filecoin-project/go-lotus/chain/address" "github.com/filecoin-project/go-lotus/chain/store" "github.com/filecoin-project/go-lotus/chain/types" - sectorbuilder "github.com/filecoin-project/go-sectorbuilder" - "github.com/ipfs/go-cid" - "github.com/ipfs/go-filestore" ) +func init() { + cbor.RegisterCborType(SealedRef{}) +} + type Common interface { // Auth AuthVerify(ctx context.Context, token string) ([]string, error) @@ -129,6 +133,8 @@ type StorageMiner interface { // Seal all staged sectors SectorsStagedSeal(context.Context) error + + SectorsRefs(context.Context) (map[string][]SealedRef, error) } // Version provides various build-time information @@ -178,3 +184,9 @@ type MinerPower struct { MinerPower types.BigInt TotalPower types.BigInt } + +type SealedRef struct { + Piece string + Offset uint64 + Size uint32 +} diff --git a/api/struct.go b/api/struct.go index 84fdbf1fc..73a3db4f1 100644 --- a/api/struct.go +++ b/api/struct.go @@ -101,6 +101,8 @@ type StorageMinerStruct struct { SectorsStatus func(context.Context, uint64) (sectorbuilder.SectorSealingStatus, error) `perm:"read"` SectorsStagedList func(context.Context) ([]sectorbuilder.StagedSectorMetadata, error) `perm:"read"` SectorsStagedSeal func(context.Context) error `perm:"write"` + + SectorsRefs func(context.Context) (map[string][]SealedRef, error) `perm:"read"` } } @@ -329,6 +331,10 @@ func (c *StorageMinerStruct) SectorsStagedSeal(ctx context.Context) error { return c.Internal.SectorsStagedSeal(ctx) } +func (c *StorageMinerStruct) SectorsRefs(ctx context.Context) (map[string][]SealedRef, error) { + return c.Internal.SectorsRefs(ctx) +} + var _ Common = &CommonStruct{} var _ FullNode = &FullNodeStruct{} var _ StorageMiner = &StorageMinerStruct{} diff --git a/chain/deals/handler.go b/chain/deals/handler.go index 6bf8713f1..f4e33c6b6 100644 --- a/chain/deals/handler.go +++ b/chain/deals/handler.go @@ -2,7 +2,7 @@ package deals import ( "context" - "github.com/filecoin-project/go-lotus/storage/sealedbstore" + "github.com/filecoin-project/go-lotus/storage/sectorblocks" "math" "github.com/filecoin-project/go-lotus/api" @@ -34,7 +34,7 @@ type MinerDeal struct { } type Handler struct { - secst *sealedbstore.Sealedbstore + secst *sectorblocks.SectorBlocks full api.FullNode // TODO: Use a custom protocol or graphsync in the future @@ -59,7 +59,7 @@ type dealUpdate struct { mut func(*MinerDeal) } -func NewHandler(ds dtypes.MetadataDS, secst *sealedbstore.Sealedbstore, dag dtypes.StagingDAG, fullNode api.FullNode) (*Handler, error) { +func NewHandler(ds dtypes.MetadataDS, secst *sectorblocks.SectorBlocks, dag dtypes.StagingDAG, fullNode api.FullNode) (*Handler, error) { addr, err := ds.Get(datastore.NewKey("miner-address")) if err != nil { return nil, err diff --git a/chain/deals/handler_states.go b/chain/deals/handler_states.go index 1693d5a95..c84320fb1 100644 --- a/chain/deals/handler_states.go +++ b/chain/deals/handler_states.go @@ -8,7 +8,7 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/go-lotus/lib/sectorbuilder" - "github.com/filecoin-project/go-lotus/storage/sealedbstore" + "github.com/filecoin-project/go-lotus/storage/sectorblocks" ) type handlerFunc func(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) @@ -76,10 +76,10 @@ func (h *Handler) staged(ctx context.Context, deal MinerDeal) (func(*MinerDeal), return nil, xerrors.Errorf("cannot open unixfs file: %s", err) } - uf, ok := n.(sealedbstore.UnixfsReader) + uf, ok := n.(sectorblocks.UnixfsReader) if !ok { // we probably got directory, unsupported for now - return nil, xerrors.Errorf("unsupported unixfs type") + return nil, xerrors.Errorf("unsupported unixfs file type") } sectorID, err := h.secst.AddUnixfsPiece(deal.Proposal.PieceRef, uf, deal.Proposal.Duration) diff --git a/cmd/lotus-storage-miner/sectors.go b/cmd/lotus-storage-miner/sectors.go index 3fd5aab07..7aba38b0b 100644 --- a/cmd/lotus-storage-miner/sectors.go +++ b/cmd/lotus-storage-miner/sectors.go @@ -36,6 +36,7 @@ var sectorsCmd = &cli.Command{ sectorsStatusCmd, sectorsStagedListCmd, sectorsStagedSealCmd, + sectorsRefsCmd, }, } @@ -110,3 +111,28 @@ var sectorsStagedSealCmd = &cli.Command{ return nodeApi.SectorsStagedSeal(ctx) }, } + +var sectorsRefsCmd = &cli.Command{ + Name: "refs", + Usage: "List References to sectors", + Action: func(cctx *cli.Context) error { + nodeApi, err := lcli.GetStorageMinerAPI(cctx) + if err != nil { + return err + } + ctx := lcli.ReqContext(cctx) + + refs, err := nodeApi.SectorsRefs(ctx) + if err != nil { + return err + } + + for name, refs := range refs { + fmt.Printf("Block %s:\n", name) + for _, ref := range refs { + fmt.Printf("\t%s+%d %d bytes\n", ref.Piece, ref.Offset, ref.Size) + } + } + return nil + }, +} diff --git a/lib/jsonrpc/client.go b/lib/jsonrpc/client.go index dd95f9c8f..73c09e0ca 100644 --- a/lib/jsonrpc/client.go +++ b/lib/jsonrpc/client.go @@ -288,6 +288,7 @@ func (fn *rpcFunc) handleRpcCall(args []reflect.Value) (results []reflect.Value) if resp.Result != nil { log.Debugw("rpc result", "type", fn.ftyp.Out(fn.valOut)) if err := json.Unmarshal(resp.Result, val.Interface()); err != nil { + log.Warnw("unmarshaling failed", "message", string(resp.Result)) return fn.processError(xerrors.Errorf("unmarshaling result: %w", err)) } } diff --git a/node/builder.go b/node/builder.go index b9ca72c12..06d90227d 100644 --- a/node/builder.go +++ b/node/builder.go @@ -3,7 +3,7 @@ package node import ( "context" "errors" - "github.com/filecoin-project/go-lotus/storage/sealedbstore" + "github.com/filecoin-project/go-lotus/storage/sectorblocks" "reflect" "time" @@ -233,7 +233,7 @@ func Online() Option { ApplyIf(func(s *Settings) bool { return s.nodeType == nodeStorageMiner }, Override(new(*sectorbuilder.SectorBuilder), sectorbuilder.New), Override(new(*sector.Store), sector.NewStore), - Override(new(*sealedbstore.Sealedbstore), sealedbstore.NewSealedbstore), + Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks), Override(new(*storage.Miner), modules.StorageMiner), Override(new(dtypes.StagingDAG), modules.StagingDAG), diff --git a/node/impl/storminer.go b/node/impl/storminer.go index dab62407d..6c2856735 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -3,6 +3,7 @@ package impl import ( "context" "fmt" + "github.com/filecoin-project/go-lotus/storage/sectorblocks" "io" "math/rand" @@ -19,6 +20,7 @@ type StorageMinerAPI struct { SectorBuilderConfig *sectorbuilder.SectorBuilderConfig SectorBuilder *sectorbuilder.SectorBuilder Sectors *sector.Store + SectorBlocks *sectorblocks.SectorBlocks Miner *storage.Miner } @@ -53,4 +55,20 @@ func (sm *StorageMinerAPI) SectorsStagedSeal(context.Context) error { return sm.SectorBuilder.SealAllStagedSectors() } +func (sm *StorageMinerAPI) SectorsRefs(context.Context) (map[string][]api.SealedRef, error) { + // json can't handle cids as map keys + out := map[string][]api.SealedRef{} + + refs, err := sm.SectorBlocks.List() + if err != nil { + return nil, err + } + + for k, v := range refs { + out[k.String()] = v + } + + return out, nil +} + var _ api.StorageMiner = &StorageMinerAPI{} diff --git a/storage/sealedbstore/sealedbstore.go b/storage/sectorblocks/blockstore.go similarity index 61% rename from storage/sealedbstore/sealedbstore.go rename to storage/sectorblocks/blockstore.go index 0c9d4df4b..970e67925 100644 --- a/storage/sealedbstore/sealedbstore.go +++ b/storage/sectorblocks/blockstore.go @@ -1,8 +1,11 @@ -package sealedbstore +package sectorblocks import ( + "context" + "github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/node/modules/dtypes" "github.com/ipfs/go-datastore/namespace" + "github.com/ipfs/go-datastore/query" "sync" "github.com/ipfs/go-cid" @@ -22,23 +25,15 @@ const ( var dsPrefix = datastore.NewKey("/sealedblocks") -type SealedRef struct { - Serialization SealSerialization - - Piece cid.Cid - Offset uint64 - Size uint32 -} - -type Sealedbstore struct { +type SectorBlocks struct { *sector.Store keys datastore.Batching keyLk sync.Mutex } -func NewSealedbstore(sectst *sector.Store, ds dtypes.MetadataDS) *Sealedbstore { - return &Sealedbstore{ +func NewSectorBlocks(sectst *sector.Store, ds dtypes.MetadataDS) *SectorBlocks { + return &SectorBlocks{ Store: sectst, keys: namespace.Wrap(ds, dsPrefix), } @@ -49,7 +44,7 @@ type UnixfsReader interface { // ReadBlock reads data from a single unixfs block. Data is nil // for intermediate nodes - ReadBlock() (data []byte, offset uint64, cid cid.Cid, err error) + ReadBlock(context.Context) (data []byte, offset uint64, cid cid.Cid, err error) } type refStorer struct { @@ -60,7 +55,7 @@ type refStorer struct { remaining []byte } -func (st *Sealedbstore) writeRef(cid cid.Cid, offset uint64, size uint32) error { +func (st *SectorBlocks) writeRef(cid cid.Cid, offset uint64, size uint32) error { st.keyLk.Lock() // TODO: make this multithreaded defer st.keyLk.Unlock() @@ -72,18 +67,17 @@ func (st *Sealedbstore) writeRef(cid cid.Cid, offset uint64, size uint32) error return err } - var refs []SealedRef + var refs []api.SealedRef if len(v) > 0 { if err := cbor.DecodeInto(v, &refs); err != nil { return err } } - refs = append(refs, SealedRef{ - Serialization: SerializationUnixfs0, - Piece: cid, - Offset: offset, - Size: size, + refs = append(refs, api.SealedRef{ + Piece: string(SerializationUnixfs0) + cid.String(), + Offset: offset, + Size: size, }) newRef, err := cbor.DumpObject(&refs) @@ -107,7 +101,7 @@ func (r *refStorer) Read(p []byte) (n int, err error) { } for { - data, offset, cid, err := r.blockReader.ReadBlock() + data, offset, cid, err := r.blockReader.ReadBlock(context.TODO()) if err != nil { return 0, err } @@ -116,10 +110,16 @@ func (r *refStorer) Read(p []byte) (n int, err error) { return 0, err } + read := copy(p, data) + if read < len(data) { + r.remaining = data[read:] + } + // TODO: read multiple + return read, nil } } -func (st *Sealedbstore) AddUnixfsPiece(ref cid.Cid, r UnixfsReader, keepAtLeast uint64) (sectorID uint64, err error) { +func (st *SectorBlocks) AddUnixfsPiece(ref cid.Cid, r UnixfsReader, keepAtLeast uint64) (sectorID uint64, err error) { size, err := r.Size() if err != nil { return 0, err @@ -129,3 +129,32 @@ func (st *Sealedbstore) AddUnixfsPiece(ref cid.Cid, r UnixfsReader, keepAtLeast return st.Store.AddPiece(refst.pieceRef, uint64(size), refst) } + +func (st *SectorBlocks) List() (map[cid.Cid][]api.SealedRef, error) { + res, err := st.keys.Query(query.Query{}) + if err != nil { + return nil, err + } + + ents, err := res.Rest() + if err != nil { + return nil, err + } + + out := map[cid.Cid][]api.SealedRef{} + for _, ent := range ents { + refCid, err := dshelp.DsKeyToCid(datastore.RawKey(ent.Key)) + if err != nil { + return nil, err + } + + var refs []api.SealedRef + if err := cbor.DecodeInto(ent.Value, &refs); err != nil { + return nil, err + } + + out[refCid] = refs + } + + return out, nil +}