package sectorblocks import ( "bytes" "context" "encoding/binary" "errors" "io" "sync" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" "github.com/ipfs/go-datastore/query" dshelp "github.com/ipfs/go-ipfs-ds-help" "golang.org/x/xerrors" cborutil "github.com/filecoin-project/go-cbor-util" "github.com/filecoin-project/specs-actors/actors/abi" sealing "github.com/filecoin-project/storage-fsm" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/storage" ) type SealSerialization uint8 const ( SerializationUnixfs0 SealSerialization = 'u' ) var dsPrefix = datastore.NewKey("/sealedblocks") var ErrNotFound = errors.New("not found") func DealIDToDsKey(dealID abi.DealID) datastore.Key { buf := make([]byte, binary.MaxVarintLen64) size := binary.PutUvarint(buf, uint64(dealID)) return dshelp.NewKeyFromBinary(buf[:size]) } func DsKeyToDealID(key datastore.Key) (uint64, error) { buf, err := dshelp.BinaryFromDsKey(key) if err != nil { return 0, err } dealID, _ := binary.Uvarint(buf) return dealID, nil } type SectorBlocks struct { *storage.Miner keys datastore.Batching keyLk sync.Mutex } func NewSectorBlocks(miner *storage.Miner, ds dtypes.MetadataDS) *SectorBlocks { sbc := &SectorBlocks{ Miner: miner, keys: namespace.Wrap(ds, dsPrefix), } return sbc } func (st *SectorBlocks) writeRef(dealID abi.DealID, sectorID abi.SectorNumber, offset abi.PaddedPieceSize, size abi.UnpaddedPieceSize) error { st.keyLk.Lock() // TODO: make this multithreaded defer st.keyLk.Unlock() v, err := st.keys.Get(DealIDToDsKey(dealID)) if err == datastore.ErrNotFound { err = nil } if err != nil { return xerrors.Errorf("getting existing refs: %w", err) } var refs api.SealedRefs if len(v) > 0 { if err := cborutil.ReadCborRPC(bytes.NewReader(v), &refs); err != nil { return xerrors.Errorf("decoding existing refs: %w", err) } } refs.Refs = append(refs.Refs, api.SealedRef{ SectorID: sectorID, Offset: offset, Size: size, }) newRef, err := cborutil.Dump(&refs) if err != nil { return xerrors.Errorf("serializing refs: %w", err) } return st.keys.Put(DealIDToDsKey(dealID), newRef) // TODO: batch somehow } func (st *SectorBlocks) AddPiece(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader, d sealing.DealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) { sn, offset, err := st.Miner.AddPieceToAnySector(ctx, size, r, d) if err != nil { return 0, 0, err } err = st.writeRef(d.DealID, sn, offset, size) if err != nil { return 0, 0, xerrors.Errorf("writeRef: %w", err) } return sn, offset, nil } func (st *SectorBlocks) List() (map[uint64][]api.SealedRef, error) { res, err := st.keys.Query(query.Query{}) if err != nil { return nil, err } ents, err := res.Rest() if err != nil { return nil, err } out := map[uint64][]api.SealedRef{} for _, ent := range ents { dealID, err := DsKeyToDealID(datastore.RawKey(ent.Key)) if err != nil { return nil, err } var refs api.SealedRefs if err := cborutil.ReadCborRPC(bytes.NewReader(ent.Value), &refs); err != nil { return nil, err } out[dealID] = refs.Refs } return out, nil } func (st *SectorBlocks) GetRefs(dealID abi.DealID) ([]api.SealedRef, error) { // TODO: track local sectors ent, err := st.keys.Get(DealIDToDsKey(dealID)) if err == datastore.ErrNotFound { err = ErrNotFound } if err != nil { return nil, err } var refs api.SealedRefs if err := cborutil.ReadCborRPC(bytes.NewReader(ent), &refs); err != nil { return nil, err } return refs.Refs, nil } func (st *SectorBlocks) GetSize(dealID abi.DealID) (uint64, error) { refs, err := st.GetRefs(dealID) if err != nil { return 0, err } return uint64(refs[0].Size), nil } func (st *SectorBlocks) Has(dealID abi.DealID) (bool, error) { // TODO: ensure sector is still there return st.keys.Has(DealIDToDsKey(dealID)) }