lotus/blockstore/ipfs.go

155 lines
3.9 KiB
Go
Raw Normal View History

package blockstore
import (
"bytes"
"context"
"io"
"github.com/ipfs/boxo/path"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
2022-06-14 15:00:51 +00:00
"github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multihash"
"golang.org/x/xerrors"
2023-06-22 08:43:56 +00:00
rpc "github.com/filecoin-project/kubo-api-client"
iface "github.com/filecoin-project/kubo-api-client/coreiface"
"github.com/filecoin-project/kubo-api-client/coreiface/options"
)
2021-01-29 23:17:25 +00:00
type IPFSBlockstore struct {
2021-02-01 13:22:32 +00:00
ctx context.Context
api, offlineAPI iface.CoreAPI
}
2021-01-29 23:17:25 +00:00
var _ BasicBlockstore = (*IPFSBlockstore)(nil)
2021-01-29 23:17:25 +00:00
func NewLocalIPFSBlockstore(ctx context.Context, onlineMode bool) (Blockstore, error) {
localApi, err := rpc.NewLocalApi()
if err != nil {
return nil, xerrors.Errorf("getting local ipfs api: %w", err)
}
api, err := localApi.WithOptions(options.Api.Offline(!onlineMode))
if err != nil {
return nil, xerrors.Errorf("setting offline mode: %s", err)
}
offlineAPI := api
if onlineMode {
offlineAPI, err = localApi.WithOptions(options.Api.Offline(true))
if err != nil {
return nil, xerrors.Errorf("applying offline mode: %s", err)
}
2021-01-29 23:17:25 +00:00
}
bs := &IPFSBlockstore{
2021-02-01 13:22:32 +00:00
ctx: ctx,
api: api,
offlineAPI: offlineAPI,
}
return Adapt(bs), nil
}
2021-01-29 23:17:25 +00:00
func NewRemoteIPFSBlockstore(ctx context.Context, maddr multiaddr.Multiaddr, onlineMode bool) (Blockstore, error) {
httpApi, err := rpc.NewApi(maddr)
if err != nil {
return nil, xerrors.Errorf("setting remote ipfs api: %w", err)
}
api, err := httpApi.WithOptions(options.Api.Offline(!onlineMode))
if err != nil {
return nil, xerrors.Errorf("applying offline mode: %s", err)
}
offlineAPI := api
if onlineMode {
offlineAPI, err = httpApi.WithOptions(options.Api.Offline(true))
if err != nil {
return nil, xerrors.Errorf("applying offline mode: %s", err)
}
2021-01-29 23:17:25 +00:00
}
bs := &IPFSBlockstore{
2021-02-01 13:22:32 +00:00
ctx: ctx,
api: api,
offlineAPI: offlineAPI,
}
return Adapt(bs), nil
}
func (i *IPFSBlockstore) DeleteBlock(ctx context.Context, cid cid.Cid) error {
return xerrors.Errorf("not supported")
}
func (i *IPFSBlockstore) Has(ctx context.Context, cid cid.Cid) (bool, error) {
_, err := i.offlineAPI.Block().Stat(ctx, path.FromCid(cid))
if err != nil {
// The underlying client is running in Offline mode.
// Stat() will fail with an err if the block isn't in the
// blockstore. If that's the case, return false without
// an error since that's the original intention of this method.
if err.Error() == "blockservice: key not found" {
return false, nil
}
return false, xerrors.Errorf("getting ipfs block: %w", err)
}
return true, nil
}
func (i *IPFSBlockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) {
rd, err := i.api.Block().Get(ctx, path.FromCid(cid))
if err != nil {
return nil, xerrors.Errorf("getting ipfs block: %w", err)
}
data, err := io.ReadAll(rd)
if err != nil {
return nil, err
}
return blocks.NewBlockWithCid(data, cid)
}
func (i *IPFSBlockstore) GetSize(ctx context.Context, cid cid.Cid) (int, error) {
st, err := i.api.Block().Stat(ctx, path.FromCid(cid))
if err != nil {
return 0, xerrors.Errorf("getting ipfs block: %w", err)
}
return st.Size(), nil
}
func (i *IPFSBlockstore) Put(ctx context.Context, block blocks.Block) error {
mhd, err := multihash.Decode(block.Cid().Hash())
if err != nil {
return err
}
_, err = i.api.Block().Put(ctx, bytes.NewReader(block.RawData()),
options.Block.Hash(mhd.Code, mhd.Length),
options.Block.Format(multihash.Codes[block.Cid().Type()]))
return err
}
func (i *IPFSBlockstore) PutMany(ctx context.Context, blocks []blocks.Block) error {
// TODO: could be done in parallel
for _, block := range blocks {
if err := i.Put(ctx, block); err != nil {
return err
}
}
return nil
}
2021-01-29 23:17:25 +00:00
func (i *IPFSBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, xerrors.Errorf("not supported")
}
2021-12-11 21:03:00 +00:00
func (i *IPFSBlockstore) HashOnRead(enabled bool) {
return // TODO: We could technically support this, but..
}