correctly handle identity cids

This commit is contained in:
vyzo 2021-07-09 11:31:04 +03:00
parent 909f7039d4
commit de5e21bf1a

View File

@ -18,6 +18,7 @@ import (
cid "github.com/ipfs/go-cid"
dstore "github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2"
mh "github.com/multiformats/go-multihash"
cbg "github.com/whyrusleeping/cbor-gen"
"github.com/filecoin-project/go-state-types/abi"
@ -210,6 +211,10 @@ func (s *SplitStore) DeleteMany(_ []cid.Cid) error {
}
func (s *SplitStore) Has(cid cid.Cid) (bool, error) {
if isIdentiyCid(cid) {
return true, nil
}
s.txnLk.RLock()
defer s.txnLk.RUnlock()
@ -232,6 +237,15 @@ func (s *SplitStore) Has(cid cid.Cid) (bool, error) {
}
func (s *SplitStore) Get(cid cid.Cid) (blocks.Block, error) {
if isIdentiyCid(cid) {
data, err := decodeIdentityCid(cid)
if err != nil {
return nil, err
}
return blocks.NewBlockWithCid(data, cid)
}
s.txnLk.RLock()
defer s.txnLk.RUnlock()
@ -269,6 +283,15 @@ func (s *SplitStore) Get(cid cid.Cid) (blocks.Block, error) {
}
func (s *SplitStore) GetSize(cid cid.Cid) (int, error) {
if isIdentiyCid(cid) {
data, err := decodeIdentityCid(cid)
if err != nil {
return 0, err
}
return len(data), nil
}
s.txnLk.RLock()
defer s.txnLk.RUnlock()
@ -305,6 +328,10 @@ func (s *SplitStore) GetSize(cid cid.Cid) (int, error) {
}
func (s *SplitStore) Put(blk blocks.Block) error {
if isIdentiyCid(blk.Cid()) {
return nil
}
s.txnLk.RLock()
defer s.txnLk.RUnlock()
@ -324,6 +351,31 @@ func (s *SplitStore) Put(blk blocks.Block) error {
}
func (s *SplitStore) PutMany(blks []blocks.Block) error {
// filter identites
idcids := 0
for _, blk := range blks {
if isIdentiyCid(blk.Cid()) {
idcids++
}
}
if idcids > 0 {
if idcids == len(blks) {
// it's all identities
return nil
}
filtered := make([]blocks.Block, 0, len(blks)-idcids)
for _, blk := range blks {
if isIdentiyCid(blk.Cid()) {
continue
}
filtered = append(filtered, blk)
}
blks = filtered
}
batch := make([]cid.Cid, 0, len(blks))
for _, blk := range blks {
batch = append(batch, blk.Cid())
@ -387,6 +439,15 @@ func (s *SplitStore) HashOnRead(enabled bool) {
}
func (s *SplitStore) View(cid cid.Cid, cb func([]byte) error) error {
if isIdentiyCid(cid) {
data, err := decodeIdentityCid(cid)
if err != nil {
return err
}
return cb(data)
}
// optimistically protect the reference so that we can call the underlying View
// without holding hte lock.
// This allows the user callback to call into the blockstore without deadlocking.
@ -625,7 +686,7 @@ func (s *SplitStore) doTxnProtect(root cid.Cid, batch map[cid.Cid]struct{}) erro
// cannot be deleted before the object itself.
err := s.walkObjectIncomplete(root, cid.NewSet(),
func(c cid.Cid) error {
if isFilCommitment(c) {
if isUnitaryObject(c) {
return errStopWalk
}
@ -713,7 +774,7 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
missing := int64(0)
err := s.walkChain(curTs, epoch, false,
func(c cid.Cid) error {
if isFilCommitment(c) {
if isUnitaryObject(c) {
return errStopWalk
}
@ -830,7 +891,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
var count int64
err = s.walkChain(curTs, boundaryEpoch, true,
func(c cid.Cid) error {
if isFilCommitment(c) {
if isUnitaryObject(c) {
return errStopWalk
}
@ -865,7 +926,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
return err
}
if isFilCommitment(c) {
if isUnitaryObject(c) {
continue
}
@ -880,7 +941,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
err = s.walkObjectIncomplete(c, walked,
func(c cid.Cid) error {
if isFilCommitment(c) {
if isUnitaryObject(c) {
return errStopWalk
}
@ -1536,7 +1597,7 @@ func (s *SplitStore) waitForMissingRefs() {
for c := range towalk {
err := s.walkObjectIncomplete(c, walked,
func(c cid.Cid) error {
if isFilCommitment(c) {
if isUnitaryObject(c) {
return errStopWalk
}
@ -1636,11 +1697,30 @@ func bytesToUint64(buf []byte) uint64 {
return i
}
func isFilCommitment(c cid.Cid) bool {
switch c.Prefix().Codec {
func isUnitaryObject(c cid.Cid) bool {
pre := c.Prefix()
switch pre.Codec {
case cid.FilCommitmentSealed, cid.FilCommitmentUnsealed:
return true
default:
return false
return pre.MhType == mh.IDENTITY
}
}
func isIdentiyCid(c cid.Cid) bool {
return c.Prefix().MhType == mh.IDENTITY
}
func decodeIdentityCid(c cid.Cid) ([]byte, error) {
dmh, err := mh.Decode(c.Hash())
if err != nil {
return nil, xerrors.Errorf("error decoding identity cid %s: %w", c, err)
}
// sanity check
if dmh.Code != mh.IDENTITY {
return nil, xerrors.Errorf("error decoding identity cid %s: hash type is not identity", c)
}
return dmh.Digest, nil
}