167 lines
4.1 KiB
Go
167 lines
4.1 KiB
Go
package retrievaladapter
|
|
|
|
import (
|
|
"fmt"
|
|
"path/filepath"
|
|
"sync"
|
|
|
|
"github.com/ipfs/go-cid"
|
|
bstore "github.com/ipfs/go-ipfs-blockstore"
|
|
"github.com/ipld/go-car/v2/blockstore"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
|
|
|
|
"github.com/filecoin-project/lotus/api"
|
|
lbstore "github.com/filecoin-project/lotus/blockstore"
|
|
)
|
|
|
|
// ProxyBlockstoreAccessor is an accessor that returns a fixed blockstore.
|
|
// To be used in combination with IPFS integration.
|
|
type ProxyBlockstoreAccessor struct {
|
|
Blockstore bstore.Blockstore
|
|
}
|
|
|
|
var _ retrievalmarket.BlockstoreAccessor = (*ProxyBlockstoreAccessor)(nil)
|
|
|
|
func NewFixedBlockstoreAccessor(bs bstore.Blockstore) retrievalmarket.BlockstoreAccessor {
|
|
return &ProxyBlockstoreAccessor{Blockstore: bs}
|
|
}
|
|
|
|
func (p *ProxyBlockstoreAccessor) Get(_ retrievalmarket.DealID, _ retrievalmarket.PayloadCID) (bstore.Blockstore, error) {
|
|
return p.Blockstore, nil
|
|
}
|
|
|
|
func (p *ProxyBlockstoreAccessor) Done(_ retrievalmarket.DealID) error {
|
|
return nil
|
|
}
|
|
|
|
func NewAPIBlockstoreAdapter(sub retrievalmarket.BlockstoreAccessor) *APIBlockstoreAccessor {
|
|
return &APIBlockstoreAccessor{
|
|
sub: sub,
|
|
retrStores: map[retrievalmarket.DealID]api.RemoteStoreID{},
|
|
remoteStores: map[api.RemoteStoreID]bstore.Blockstore{},
|
|
}
|
|
}
|
|
|
|
// APIBlockstoreAccessor adds support to API-specified remote blockstores
|
|
type APIBlockstoreAccessor struct {
|
|
sub retrievalmarket.BlockstoreAccessor
|
|
|
|
retrStores map[retrievalmarket.DealID]api.RemoteStoreID
|
|
remoteStores map[api.RemoteStoreID]bstore.Blockstore
|
|
|
|
accessLk sync.Mutex
|
|
}
|
|
|
|
func (a *APIBlockstoreAccessor) Get(id retrievalmarket.DealID, payloadCID retrievalmarket.PayloadCID) (bstore.Blockstore, error) {
|
|
a.accessLk.Lock()
|
|
defer a.accessLk.Unlock()
|
|
|
|
as, has := a.retrStores[id]
|
|
if !has {
|
|
return a.sub.Get(id, payloadCID)
|
|
}
|
|
|
|
return a.remoteStores[as], nil
|
|
}
|
|
|
|
func (a *APIBlockstoreAccessor) Done(id retrievalmarket.DealID) error {
|
|
a.accessLk.Lock()
|
|
defer a.accessLk.Unlock()
|
|
|
|
if _, has := a.retrStores[id]; has {
|
|
delete(a.retrStores, id)
|
|
return nil
|
|
}
|
|
return a.sub.Done(id)
|
|
}
|
|
|
|
func (a *APIBlockstoreAccessor) RegisterDealToRetrievalStore(id retrievalmarket.DealID, sid api.RemoteStoreID) error {
|
|
a.accessLk.Lock()
|
|
defer a.accessLk.Unlock()
|
|
|
|
if _, has := a.retrStores[id]; has {
|
|
return xerrors.Errorf("apistore for deal %d already registered", id)
|
|
}
|
|
if _, has := a.remoteStores[sid]; !has {
|
|
return xerrors.Errorf("remote store not found")
|
|
}
|
|
|
|
a.retrStores[id] = sid
|
|
return nil
|
|
}
|
|
|
|
func (a *APIBlockstoreAccessor) RegisterApiStore(sid api.RemoteStoreID, st *lbstore.NetworkStore) error {
|
|
a.accessLk.Lock()
|
|
defer a.accessLk.Unlock()
|
|
|
|
if _, has := a.remoteStores[sid]; has {
|
|
return xerrors.Errorf("remote store already registered with this uuid")
|
|
}
|
|
|
|
a.remoteStores[sid] = st
|
|
|
|
st.OnClose(func() {
|
|
a.accessLk.Lock()
|
|
defer a.accessLk.Unlock()
|
|
|
|
if _, has := a.remoteStores[sid]; has {
|
|
delete(a.remoteStores, sid)
|
|
}
|
|
})
|
|
return nil
|
|
}
|
|
|
|
var _ retrievalmarket.BlockstoreAccessor = &APIBlockstoreAccessor{}
|
|
|
|
type CARBlockstoreAccessor struct {
|
|
rootdir string
|
|
lk sync.Mutex
|
|
open map[retrievalmarket.DealID]*blockstore.ReadWrite
|
|
}
|
|
|
|
var _ retrievalmarket.BlockstoreAccessor = (*CARBlockstoreAccessor)(nil)
|
|
|
|
func NewCARBlockstoreAccessor(rootdir string) *CARBlockstoreAccessor {
|
|
return &CARBlockstoreAccessor{
|
|
rootdir: rootdir,
|
|
open: make(map[retrievalmarket.DealID]*blockstore.ReadWrite),
|
|
}
|
|
}
|
|
|
|
func (c *CARBlockstoreAccessor) Get(id retrievalmarket.DealID, payloadCid retrievalmarket.PayloadCID) (bstore.Blockstore, error) {
|
|
c.lk.Lock()
|
|
defer c.lk.Unlock()
|
|
|
|
bs, ok := c.open[id]
|
|
if ok {
|
|
return bs, nil
|
|
}
|
|
|
|
path := c.PathFor(id)
|
|
bs, err := blockstore.OpenReadWrite(path, []cid.Cid{payloadCid}, blockstore.UseWholeCIDs(true))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
c.open[id] = bs
|
|
return bs, nil
|
|
}
|
|
|
|
func (c *CARBlockstoreAccessor) Done(id retrievalmarket.DealID) error {
|
|
c.lk.Lock()
|
|
defer c.lk.Unlock()
|
|
|
|
bs, ok := c.open[id]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
delete(c.open, id)
|
|
return bs.Finalize()
|
|
}
|
|
|
|
func (c *CARBlockstoreAccessor) PathFor(id retrievalmarket.DealID) string {
|
|
return filepath.Join(c.rootdir, fmt.Sprintf("%d.car", id))
|
|
}
|