400 lines
		
	
	
		
			8.5 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			400 lines
		
	
	
		
			8.5 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package repo
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"encoding/json"
 | 
						|
	"io/ioutil"
 | 
						|
	"os"
 | 
						|
	"path/filepath"
 | 
						|
	"sync"
 | 
						|
 | 
						|
	"github.com/google/uuid"
 | 
						|
	"github.com/ipfs/go-datastore"
 | 
						|
	"github.com/ipfs/go-datastore/namespace"
 | 
						|
	dssync "github.com/ipfs/go-datastore/sync"
 | 
						|
	"github.com/multiformats/go-multiaddr"
 | 
						|
	"golang.org/x/xerrors"
 | 
						|
 | 
						|
	"github.com/filecoin-project/lotus/blockstore"
 | 
						|
	"github.com/filecoin-project/lotus/chain/types"
 | 
						|
	"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
 | 
						|
	"github.com/filecoin-project/lotus/extern/sector-storage/stores"
 | 
						|
	"github.com/filecoin-project/lotus/node/config"
 | 
						|
)
 | 
						|
 | 
						|
type MemRepo struct {
 | 
						|
	api struct {
 | 
						|
		sync.Mutex
 | 
						|
		ma    multiaddr.Multiaddr
 | 
						|
		token []byte
 | 
						|
	}
 | 
						|
 | 
						|
	repoLock chan struct{}
 | 
						|
	token    *byte
 | 
						|
 | 
						|
	datastore  datastore.Datastore
 | 
						|
	keystore   map[string]types.KeyInfo
 | 
						|
	blockstore blockstore.Blockstore
 | 
						|
 | 
						|
	// given a repo type, produce the default config
 | 
						|
	configF func(t RepoType) interface{}
 | 
						|
 | 
						|
	// holds the current config value
 | 
						|
	config struct {
 | 
						|
		sync.Mutex
 | 
						|
		val interface{}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
type lockedMemRepo struct {
 | 
						|
	mem *MemRepo
 | 
						|
	t   RepoType
 | 
						|
	sync.RWMutex
 | 
						|
 | 
						|
	tempDir string
 | 
						|
	token   *byte
 | 
						|
	sc      *stores.StorageConfig
 | 
						|
}
 | 
						|
 | 
						|
func (lmem *lockedMemRepo) GetStorage() (stores.StorageConfig, error) {
 | 
						|
	if err := lmem.checkToken(); err != nil {
 | 
						|
		return stores.StorageConfig{}, err
 | 
						|
	}
 | 
						|
 | 
						|
	if lmem.sc == nil {
 | 
						|
		lmem.sc = &stores.StorageConfig{StoragePaths: []stores.LocalPath{
 | 
						|
			{Path: lmem.Path()},
 | 
						|
		}}
 | 
						|
	}
 | 
						|
 | 
						|
	return *lmem.sc, nil
 | 
						|
}
 | 
						|
 | 
						|
func (lmem *lockedMemRepo) SetStorage(c func(*stores.StorageConfig)) error {
 | 
						|
	if err := lmem.checkToken(); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	_, _ = lmem.GetStorage()
 | 
						|
 | 
						|
	c(lmem.sc)
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (lmem *lockedMemRepo) Stat(path string) (fsutil.FsStat, error) {
 | 
						|
	return fsutil.Statfs(path)
 | 
						|
}
 | 
						|
 | 
						|
func (lmem *lockedMemRepo) DiskUsage(path string) (int64, error) {
 | 
						|
	si, err := fsutil.FileSize(path)
 | 
						|
	if err != nil {
 | 
						|
		return 0, err
 | 
						|
	}
 | 
						|
	return si.OnDisk, nil
 | 
						|
}
 | 
						|
 | 
						|
func (lmem *lockedMemRepo) Path() string {
 | 
						|
	lmem.Lock()
 | 
						|
	defer lmem.Unlock()
 | 
						|
 | 
						|
	if lmem.tempDir != "" {
 | 
						|
		return lmem.tempDir
 | 
						|
	}
 | 
						|
 | 
						|
	t, err := ioutil.TempDir(os.TempDir(), "lotus-memrepo-temp-")
 | 
						|
	if err != nil {
 | 
						|
		panic(err) // only used in tests, probably fine
 | 
						|
	}
 | 
						|
 | 
						|
	if lmem.t == StorageMiner {
 | 
						|
		// this is required due to the method makeDealStaging from cmd/lotus-storage-miner/init.go
 | 
						|
		// deal-staging is the directory deal files are staged in before being sealed into sectors
 | 
						|
		// for offline deal flow.
 | 
						|
		if err := os.MkdirAll(filepath.Join(t, "deal-staging"), 0755); err != nil {
 | 
						|
			panic(err)
 | 
						|
		}
 | 
						|
		if err := config.WriteStorageFile(filepath.Join(t, fsStorageConfig), stores.StorageConfig{
 | 
						|
			StoragePaths: []stores.LocalPath{
 | 
						|
				{Path: t},
 | 
						|
			}}); err != nil {
 | 
						|
			panic(err)
 | 
						|
		}
 | 
						|
 | 
						|
		b, err := json.MarshalIndent(&stores.LocalStorageMeta{
 | 
						|
			ID:       stores.ID(uuid.New().String()),
 | 
						|
			Weight:   10,
 | 
						|
			CanSeal:  true,
 | 
						|
			CanStore: true,
 | 
						|
		}, "", "  ")
 | 
						|
		if err != nil {
 | 
						|
			panic(err)
 | 
						|
		}
 | 
						|
 | 
						|
		if err := ioutil.WriteFile(filepath.Join(t, "sectorstore.json"), b, 0644); err != nil {
 | 
						|
			panic(err)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	lmem.tempDir = t
 | 
						|
	return t
 | 
						|
}
 | 
						|
 | 
						|
var _ Repo = &MemRepo{}
 | 
						|
 | 
						|
// MemRepoOptions contains options for memory repo
 | 
						|
type MemRepoOptions struct {
 | 
						|
	Ds       datastore.Datastore
 | 
						|
	ConfigF  func(RepoType) interface{}
 | 
						|
	KeyStore map[string]types.KeyInfo
 | 
						|
}
 | 
						|
 | 
						|
// NewMemory creates new memory based repo with provided options.
 | 
						|
// opts can be nil, it  will be replaced with defaults.
 | 
						|
// Any field in opts can be nil, they will be replaced by defaults.
 | 
						|
func NewMemory(opts *MemRepoOptions) *MemRepo {
 | 
						|
	if opts == nil {
 | 
						|
		opts = &MemRepoOptions{}
 | 
						|
	}
 | 
						|
	if opts.ConfigF == nil {
 | 
						|
		opts.ConfigF = defConfForType
 | 
						|
	}
 | 
						|
	if opts.Ds == nil {
 | 
						|
		opts.Ds = dssync.MutexWrap(datastore.NewMapDatastore())
 | 
						|
	}
 | 
						|
	if opts.KeyStore == nil {
 | 
						|
		opts.KeyStore = make(map[string]types.KeyInfo)
 | 
						|
	}
 | 
						|
 | 
						|
	return &MemRepo{
 | 
						|
		repoLock:   make(chan struct{}, 1),
 | 
						|
		blockstore: blockstore.WrapIDStore(blockstore.NewMemorySync()),
 | 
						|
		datastore:  opts.Ds,
 | 
						|
		configF:    opts.ConfigF,
 | 
						|
		keystore:   opts.KeyStore,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (mem *MemRepo) APIEndpoint() (multiaddr.Multiaddr, error) {
 | 
						|
	mem.api.Lock()
 | 
						|
	defer mem.api.Unlock()
 | 
						|
	if mem.api.ma == nil {
 | 
						|
		return nil, ErrNoAPIEndpoint
 | 
						|
	}
 | 
						|
	return mem.api.ma, nil
 | 
						|
}
 | 
						|
 | 
						|
func (mem *MemRepo) APIToken() ([]byte, error) {
 | 
						|
	mem.api.Lock()
 | 
						|
	defer mem.api.Unlock()
 | 
						|
	if mem.api.ma == nil {
 | 
						|
		return nil, ErrNoAPIToken
 | 
						|
	}
 | 
						|
	return mem.api.token, nil
 | 
						|
}
 | 
						|
 | 
						|
func (mem *MemRepo) Lock(t RepoType) (LockedRepo, error) {
 | 
						|
	select {
 | 
						|
	case mem.repoLock <- struct{}{}:
 | 
						|
	default:
 | 
						|
		return nil, ErrRepoAlreadyLocked
 | 
						|
	}
 | 
						|
	mem.token = new(byte)
 | 
						|
 | 
						|
	return &lockedMemRepo{
 | 
						|
		mem:   mem,
 | 
						|
		t:     t,
 | 
						|
		token: mem.token,
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
func (lmem *lockedMemRepo) Readonly() bool {
 | 
						|
	return false
 | 
						|
}
 | 
						|
 | 
						|
func (lmem *lockedMemRepo) checkToken() error {
 | 
						|
	lmem.RLock()
 | 
						|
	defer lmem.RUnlock()
 | 
						|
	if lmem.mem.token != lmem.token {
 | 
						|
		return ErrClosedRepo
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (lmem *lockedMemRepo) Close() error {
 | 
						|
	if err := lmem.checkToken(); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	lmem.Lock()
 | 
						|
	defer lmem.Unlock()
 | 
						|
 | 
						|
	if lmem.mem.token != lmem.token {
 | 
						|
		return ErrClosedRepo
 | 
						|
	}
 | 
						|
 | 
						|
	if lmem.tempDir != "" {
 | 
						|
		if err := os.RemoveAll(lmem.tempDir); err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		lmem.tempDir = ""
 | 
						|
	}
 | 
						|
 | 
						|
	lmem.mem.token = nil
 | 
						|
	lmem.mem.api.Lock()
 | 
						|
	lmem.mem.api.ma = nil
 | 
						|
	lmem.mem.api.Unlock()
 | 
						|
	<-lmem.mem.repoLock // unlock
 | 
						|
	return nil
 | 
						|
 | 
						|
}
 | 
						|
 | 
						|
func (lmem *lockedMemRepo) Datastore(_ context.Context, ns string) (datastore.Batching, error) {
 | 
						|
	if err := lmem.checkToken(); err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	return namespace.Wrap(lmem.mem.datastore, datastore.NewKey(ns)), nil
 | 
						|
}
 | 
						|
 | 
						|
func (lmem *lockedMemRepo) Blockstore(ctx context.Context, domain BlockstoreDomain) (blockstore.Blockstore, error) {
 | 
						|
	if domain != UniversalBlockstore {
 | 
						|
		return nil, ErrInvalidBlockstoreDomain
 | 
						|
	}
 | 
						|
	return lmem.mem.blockstore, nil
 | 
						|
}
 | 
						|
 | 
						|
func (lmem *lockedMemRepo) SplitstorePath() (string, error) {
 | 
						|
	return ioutil.TempDir("", "splitstore.*")
 | 
						|
}
 | 
						|
 | 
						|
func (lmem *lockedMemRepo) ListDatastores(ns string) ([]int64, error) {
 | 
						|
	return nil, nil
 | 
						|
}
 | 
						|
 | 
						|
func (lmem *lockedMemRepo) DeleteDatastore(ns string) error {
 | 
						|
	/** poof **/
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (lmem *lockedMemRepo) Config() (interface{}, error) {
 | 
						|
	if err := lmem.checkToken(); err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	lmem.mem.config.Lock()
 | 
						|
	defer lmem.mem.config.Unlock()
 | 
						|
 | 
						|
	if lmem.mem.config.val == nil {
 | 
						|
		lmem.mem.config.val = lmem.mem.configF(lmem.t)
 | 
						|
	}
 | 
						|
 | 
						|
	return lmem.mem.config.val, nil
 | 
						|
}
 | 
						|
 | 
						|
func (lmem *lockedMemRepo) SetConfig(c func(interface{})) error {
 | 
						|
	if err := lmem.checkToken(); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	lmem.mem.config.Lock()
 | 
						|
	defer lmem.mem.config.Unlock()
 | 
						|
 | 
						|
	if lmem.mem.config.val == nil {
 | 
						|
		lmem.mem.config.val = lmem.mem.configF(lmem.t)
 | 
						|
	}
 | 
						|
 | 
						|
	c(lmem.mem.config.val)
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (lmem *lockedMemRepo) SetAPIEndpoint(ma multiaddr.Multiaddr) error {
 | 
						|
	if err := lmem.checkToken(); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	lmem.mem.api.Lock()
 | 
						|
	lmem.mem.api.ma = ma
 | 
						|
	lmem.mem.api.Unlock()
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (lmem *lockedMemRepo) SetAPIToken(token []byte) error {
 | 
						|
	if err := lmem.checkToken(); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	lmem.mem.api.Lock()
 | 
						|
	lmem.mem.api.token = token
 | 
						|
	lmem.mem.api.Unlock()
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (lmem *lockedMemRepo) KeyStore() (types.KeyStore, error) {
 | 
						|
	if err := lmem.checkToken(); err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	return lmem, nil
 | 
						|
}
 | 
						|
 | 
						|
// Implement KeyStore on the same instance
 | 
						|
 | 
						|
// List lists all the keys stored in the KeyStore
 | 
						|
func (lmem *lockedMemRepo) List() ([]string, error) {
 | 
						|
	if err := lmem.checkToken(); err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	lmem.RLock()
 | 
						|
	defer lmem.RUnlock()
 | 
						|
 | 
						|
	res := make([]string, 0, len(lmem.mem.keystore))
 | 
						|
	for k := range lmem.mem.keystore {
 | 
						|
		res = append(res, k)
 | 
						|
	}
 | 
						|
	return res, nil
 | 
						|
}
 | 
						|
 | 
						|
// Get gets a key out of keystore and returns types.KeyInfo coresponding to named key
 | 
						|
func (lmem *lockedMemRepo) Get(name string) (types.KeyInfo, error) {
 | 
						|
	if err := lmem.checkToken(); err != nil {
 | 
						|
		return types.KeyInfo{}, err
 | 
						|
	}
 | 
						|
	lmem.RLock()
 | 
						|
	defer lmem.RUnlock()
 | 
						|
 | 
						|
	key, ok := lmem.mem.keystore[name]
 | 
						|
	if !ok {
 | 
						|
		return types.KeyInfo{}, xerrors.Errorf("getting key '%s': %w", name, types.ErrKeyInfoNotFound)
 | 
						|
	}
 | 
						|
	return key, nil
 | 
						|
}
 | 
						|
 | 
						|
// Put saves key info under given name
 | 
						|
func (lmem *lockedMemRepo) Put(name string, key types.KeyInfo) error {
 | 
						|
	if err := lmem.checkToken(); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	lmem.Lock()
 | 
						|
	defer lmem.Unlock()
 | 
						|
 | 
						|
	_, isThere := lmem.mem.keystore[name]
 | 
						|
	if isThere {
 | 
						|
		return xerrors.Errorf("putting key '%s': %w", name, types.ErrKeyExists)
 | 
						|
	}
 | 
						|
 | 
						|
	lmem.mem.keystore[name] = key
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (lmem *lockedMemRepo) Delete(name string) error {
 | 
						|
	if err := lmem.checkToken(); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	lmem.Lock()
 | 
						|
	defer lmem.Unlock()
 | 
						|
 | 
						|
	_, isThere := lmem.mem.keystore[name]
 | 
						|
	if !isThere {
 | 
						|
		return xerrors.Errorf("deleting key '%s': %w", name, types.ErrKeyInfoNotFound)
 | 
						|
	}
 | 
						|
	delete(lmem.mem.keystore, name)
 | 
						|
	return nil
 | 
						|
}
 |