lotus/lib/ipfsbstore/ipfsbstore.go

120 lines
2.7 KiB
Go
Raw Normal View History

package ipfsbstore
import (
"bytes"
"context"
"io/ioutil"
"golang.org/x/xerrors"
"github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multihash"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
httpapi "github.com/ipfs/go-ipfs-http-client"
iface "github.com/ipfs/interface-go-ipfs-core"
"github.com/ipfs/interface-go-ipfs-core/options"
"github.com/ipfs/interface-go-ipfs-core/path"
)
type IpfsBstore struct {
ctx context.Context
api iface.CoreAPI
}
func NewIpfsBstore(ctx context.Context) (*IpfsBstore, error) {
api, err := httpapi.NewLocalApi()
if err != nil {
return nil, xerrors.Errorf("getting local ipfs api: %w", err)
}
return &IpfsBstore{
ctx: ctx,
api: api,
}, nil
}
func NewRemoteIpfsBstore(ctx context.Context, maddr multiaddr.Multiaddr) (*IpfsBstore, error) {
api, err := httpapi.NewApi(maddr)
if err != nil {
return nil, xerrors.Errorf("getting remote ipfs api: %w", err)
}
return &IpfsBstore{
ctx: ctx,
api: api,
}, nil
}
func (i *IpfsBstore) DeleteBlock(cid cid.Cid) error {
return xerrors.Errorf("not supported")
}
func (i *IpfsBstore) Has(cid cid.Cid) (bool, error) {
_, err := i.api.Block().Stat(i.ctx, path.IpldPath(cid))
if err != nil {
return false, xerrors.Errorf("getting ipfs block: %w", err)
}
return true, nil
}
func (i *IpfsBstore) Get(cid cid.Cid) (blocks.Block, error) {
rd, err := i.api.Block().Get(i.ctx, path.IpldPath(cid))
if err != nil {
return nil, xerrors.Errorf("getting ipfs block: %w", err)
}
data, err := ioutil.ReadAll(rd)
if err != nil {
return nil, err
}
return blocks.NewBlockWithCid(data, cid)
}
func (i *IpfsBstore) GetSize(cid cid.Cid) (int, error) {
st, err := i.api.Block().Stat(i.ctx, path.IpldPath(cid))
if err != nil {
return 0, xerrors.Errorf("getting ipfs block: %w", err)
}
return st.Size(), nil
}
func (i *IpfsBstore) Put(block blocks.Block) error {
mhd, err := multihash.Decode(block.Cid().Hash())
if err != nil {
return err
}
_, err = i.api.Block().Put(i.ctx, bytes.NewReader(block.RawData()),
options.Block.Hash(mhd.Code, mhd.Length),
options.Block.Format(cid.CodecToStr[block.Cid().Type()]))
return err
}
func (i *IpfsBstore) PutMany(blocks []blocks.Block) error {
// TODO: could be done in parallel
for _, block := range blocks {
if err := i.Put(block); err != nil {
return err
}
}
return nil
}
func (i *IpfsBstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, xerrors.Errorf("not supported")
}
func (i *IpfsBstore) HashOnRead(enabled bool) {
return // TODO: We could technically support this, but..
}
var _ blockstore.Blockstore = &IpfsBstore{}