3795cc2bd2
This paves the way for better object lifetime management. Concretely, it makes it possible to: - have different stores backing chain and state data. - having the same datastore library, but using different parameters. - attach different caching layers/policies to each class of data, e.g. sizing caches differently. - specifying different retention policies for chain and state data. This separation is important because: - access patterns/frequency of chain and state data are different. - state is derivable from chain, so one could never expunge the chain store, and only retain state objects reachable from the last finality in the state store.
394 lines
8.2 KiB
Go
394 lines
8.2 KiB
Go
package repo
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"io/ioutil"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/ipfs/go-datastore"
|
|
"github.com/ipfs/go-datastore/namespace"
|
|
dssync "github.com/ipfs/go-datastore/sync"
|
|
"github.com/multiformats/go-multiaddr"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/filecoin-project/lotus/blockstore"
|
|
"github.com/filecoin-project/lotus/chain/types"
|
|
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
|
|
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
|
|
"github.com/filecoin-project/lotus/node/config"
|
|
)
|
|
|
|
type MemRepo struct {
|
|
api struct {
|
|
sync.Mutex
|
|
ma multiaddr.Multiaddr
|
|
token []byte
|
|
}
|
|
|
|
repoLock chan struct{}
|
|
token *byte
|
|
|
|
datastore datastore.Datastore
|
|
keystore map[string]types.KeyInfo
|
|
blockstore blockstore.Blockstore
|
|
|
|
// given a repo type, produce the default config
|
|
configF func(t RepoType) interface{}
|
|
|
|
// holds the current config value
|
|
config struct {
|
|
sync.Mutex
|
|
val interface{}
|
|
}
|
|
}
|
|
|
|
type lockedMemRepo struct {
|
|
mem *MemRepo
|
|
t RepoType
|
|
sync.RWMutex
|
|
|
|
tempDir string
|
|
token *byte
|
|
sc *stores.StorageConfig
|
|
}
|
|
|
|
func (lmem *lockedMemRepo) GetStorage() (stores.StorageConfig, error) {
|
|
if err := lmem.checkToken(); err != nil {
|
|
return stores.StorageConfig{}, err
|
|
}
|
|
|
|
if lmem.sc == nil {
|
|
lmem.sc = &stores.StorageConfig{StoragePaths: []stores.LocalPath{
|
|
{Path: lmem.Path()},
|
|
}}
|
|
}
|
|
|
|
return *lmem.sc, nil
|
|
}
|
|
|
|
func (lmem *lockedMemRepo) SetStorage(c func(*stores.StorageConfig)) error {
|
|
if err := lmem.checkToken(); err != nil {
|
|
return err
|
|
}
|
|
|
|
_, _ = lmem.GetStorage()
|
|
|
|
c(lmem.sc)
|
|
return nil
|
|
}
|
|
|
|
func (lmem *lockedMemRepo) Stat(path string) (fsutil.FsStat, error) {
|
|
return fsutil.Statfs(path)
|
|
}
|
|
|
|
func (lmem *lockedMemRepo) DiskUsage(path string) (int64, error) {
|
|
si, err := fsutil.FileSize(path)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return si.OnDisk, nil
|
|
}
|
|
|
|
func (lmem *lockedMemRepo) Path() string {
|
|
lmem.Lock()
|
|
defer lmem.Unlock()
|
|
|
|
if lmem.tempDir != "" {
|
|
return lmem.tempDir
|
|
}
|
|
|
|
t, err := ioutil.TempDir(os.TempDir(), "lotus-memrepo-temp-")
|
|
if err != nil {
|
|
panic(err) // only used in tests, probably fine
|
|
}
|
|
|
|
if lmem.t == StorageMiner {
|
|
if err := config.WriteStorageFile(filepath.Join(t, fsStorageConfig), stores.StorageConfig{
|
|
StoragePaths: []stores.LocalPath{
|
|
{Path: t},
|
|
}}); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
b, err := json.MarshalIndent(&stores.LocalStorageMeta{
|
|
ID: stores.ID(uuid.New().String()),
|
|
Weight: 10,
|
|
CanSeal: true,
|
|
CanStore: true,
|
|
}, "", " ")
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
if err := ioutil.WriteFile(filepath.Join(t, "sectorstore.json"), b, 0644); err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
lmem.tempDir = t
|
|
return t
|
|
}
|
|
|
|
var _ Repo = &MemRepo{}
|
|
|
|
// MemRepoOptions contains options for memory repo
|
|
type MemRepoOptions struct {
|
|
Ds datastore.Datastore
|
|
ConfigF func(RepoType) interface{}
|
|
KeyStore map[string]types.KeyInfo
|
|
}
|
|
|
|
// NewMemory creates new memory based repo with provided options.
|
|
// opts can be nil, it will be replaced with defaults.
|
|
// Any field in opts can be nil, they will be replaced by defaults.
|
|
func NewMemory(opts *MemRepoOptions) *MemRepo {
|
|
if opts == nil {
|
|
opts = &MemRepoOptions{}
|
|
}
|
|
if opts.ConfigF == nil {
|
|
opts.ConfigF = defConfForType
|
|
}
|
|
if opts.Ds == nil {
|
|
opts.Ds = dssync.MutexWrap(datastore.NewMapDatastore())
|
|
}
|
|
if opts.KeyStore == nil {
|
|
opts.KeyStore = make(map[string]types.KeyInfo)
|
|
}
|
|
|
|
return &MemRepo{
|
|
repoLock: make(chan struct{}, 1),
|
|
blockstore: blockstore.WrapIDStore(blockstore.NewMemorySync()),
|
|
datastore: opts.Ds,
|
|
configF: opts.ConfigF,
|
|
keystore: opts.KeyStore,
|
|
}
|
|
}
|
|
|
|
func (mem *MemRepo) APIEndpoint() (multiaddr.Multiaddr, error) {
|
|
mem.api.Lock()
|
|
defer mem.api.Unlock()
|
|
if mem.api.ma == nil {
|
|
return nil, ErrNoAPIEndpoint
|
|
}
|
|
return mem.api.ma, nil
|
|
}
|
|
|
|
func (mem *MemRepo) APIToken() ([]byte, error) {
|
|
mem.api.Lock()
|
|
defer mem.api.Unlock()
|
|
if mem.api.ma == nil {
|
|
return nil, ErrNoAPIToken
|
|
}
|
|
return mem.api.token, nil
|
|
}
|
|
|
|
func (mem *MemRepo) Lock(t RepoType) (LockedRepo, error) {
|
|
select {
|
|
case mem.repoLock <- struct{}{}:
|
|
default:
|
|
return nil, ErrRepoAlreadyLocked
|
|
}
|
|
mem.token = new(byte)
|
|
|
|
return &lockedMemRepo{
|
|
mem: mem,
|
|
t: t,
|
|
token: mem.token,
|
|
}, nil
|
|
}
|
|
|
|
func (lmem *lockedMemRepo) Readonly() bool {
|
|
return false
|
|
}
|
|
|
|
func (lmem *lockedMemRepo) checkToken() error {
|
|
lmem.RLock()
|
|
defer lmem.RUnlock()
|
|
if lmem.mem.token != lmem.token {
|
|
return ErrClosedRepo
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (lmem *lockedMemRepo) Close() error {
|
|
if err := lmem.checkToken(); err != nil {
|
|
return err
|
|
}
|
|
lmem.Lock()
|
|
defer lmem.Unlock()
|
|
|
|
if lmem.mem.token != lmem.token {
|
|
return ErrClosedRepo
|
|
}
|
|
|
|
if lmem.tempDir != "" {
|
|
if err := os.RemoveAll(lmem.tempDir); err != nil {
|
|
return err
|
|
}
|
|
lmem.tempDir = ""
|
|
}
|
|
|
|
lmem.mem.token = nil
|
|
lmem.mem.api.Lock()
|
|
lmem.mem.api.ma = nil
|
|
lmem.mem.api.Unlock()
|
|
<-lmem.mem.repoLock // unlock
|
|
return nil
|
|
|
|
}
|
|
|
|
func (lmem *lockedMemRepo) Datastore(_ context.Context, ns string) (datastore.Batching, error) {
|
|
if err := lmem.checkToken(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return namespace.Wrap(lmem.mem.datastore, datastore.NewKey(ns)), nil
|
|
}
|
|
|
|
func (lmem *lockedMemRepo) Blockstore(ctx context.Context, domain BlockstoreDomain) (blockstore.Blockstore, error) {
|
|
if domain != UniversalBlockstore {
|
|
return nil, ErrInvalidBlockstoreDomain
|
|
}
|
|
return lmem.mem.blockstore, nil
|
|
}
|
|
|
|
func (lmem *lockedMemRepo) SplitstorePath() (string, error) {
|
|
return ioutil.TempDir("", "splitstore.*")
|
|
}
|
|
|
|
func (lmem *lockedMemRepo) ListDatastores(ns string) ([]int64, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
func (lmem *lockedMemRepo) DeleteDatastore(ns string) error {
|
|
/** poof **/
|
|
return nil
|
|
}
|
|
|
|
func (lmem *lockedMemRepo) Config() (interface{}, error) {
|
|
if err := lmem.checkToken(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
lmem.mem.config.Lock()
|
|
defer lmem.mem.config.Unlock()
|
|
|
|
if lmem.mem.config.val == nil {
|
|
lmem.mem.config.val = lmem.mem.configF(lmem.t)
|
|
}
|
|
|
|
return lmem.mem.config.val, nil
|
|
}
|
|
|
|
func (lmem *lockedMemRepo) SetConfig(c func(interface{})) error {
|
|
if err := lmem.checkToken(); err != nil {
|
|
return err
|
|
}
|
|
|
|
lmem.mem.config.Lock()
|
|
defer lmem.mem.config.Unlock()
|
|
|
|
if lmem.mem.config.val == nil {
|
|
lmem.mem.config.val = lmem.mem.configF(lmem.t)
|
|
}
|
|
|
|
c(lmem.mem.config.val)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (lmem *lockedMemRepo) SetAPIEndpoint(ma multiaddr.Multiaddr) error {
|
|
if err := lmem.checkToken(); err != nil {
|
|
return err
|
|
}
|
|
lmem.mem.api.Lock()
|
|
lmem.mem.api.ma = ma
|
|
lmem.mem.api.Unlock()
|
|
return nil
|
|
}
|
|
|
|
func (lmem *lockedMemRepo) SetAPIToken(token []byte) error {
|
|
if err := lmem.checkToken(); err != nil {
|
|
return err
|
|
}
|
|
lmem.mem.api.Lock()
|
|
lmem.mem.api.token = token
|
|
lmem.mem.api.Unlock()
|
|
return nil
|
|
}
|
|
|
|
func (lmem *lockedMemRepo) KeyStore() (types.KeyStore, error) {
|
|
if err := lmem.checkToken(); err != nil {
|
|
return nil, err
|
|
}
|
|
return lmem, nil
|
|
}
|
|
|
|
// Implement KeyStore on the same instance
|
|
|
|
// List lists all the keys stored in the KeyStore
|
|
func (lmem *lockedMemRepo) List() ([]string, error) {
|
|
if err := lmem.checkToken(); err != nil {
|
|
return nil, err
|
|
}
|
|
lmem.RLock()
|
|
defer lmem.RUnlock()
|
|
|
|
res := make([]string, 0, len(lmem.mem.keystore))
|
|
for k := range lmem.mem.keystore {
|
|
res = append(res, k)
|
|
}
|
|
return res, nil
|
|
}
|
|
|
|
// Get gets a key out of keystore and returns types.KeyInfo coresponding to named key
|
|
func (lmem *lockedMemRepo) Get(name string) (types.KeyInfo, error) {
|
|
if err := lmem.checkToken(); err != nil {
|
|
return types.KeyInfo{}, err
|
|
}
|
|
lmem.RLock()
|
|
defer lmem.RUnlock()
|
|
|
|
key, ok := lmem.mem.keystore[name]
|
|
if !ok {
|
|
return types.KeyInfo{}, xerrors.Errorf("getting key '%s': %w", name, types.ErrKeyInfoNotFound)
|
|
}
|
|
return key, nil
|
|
}
|
|
|
|
// Put saves key info under given name
|
|
func (lmem *lockedMemRepo) Put(name string, key types.KeyInfo) error {
|
|
if err := lmem.checkToken(); err != nil {
|
|
return err
|
|
}
|
|
lmem.Lock()
|
|
defer lmem.Unlock()
|
|
|
|
_, isThere := lmem.mem.keystore[name]
|
|
if isThere {
|
|
return xerrors.Errorf("putting key '%s': %w", name, types.ErrKeyExists)
|
|
}
|
|
|
|
lmem.mem.keystore[name] = key
|
|
return nil
|
|
}
|
|
|
|
func (lmem *lockedMemRepo) Delete(name string) error {
|
|
if err := lmem.checkToken(); err != nil {
|
|
return err
|
|
}
|
|
lmem.Lock()
|
|
defer lmem.Unlock()
|
|
|
|
_, isThere := lmem.mem.keystore[name]
|
|
if !isThere {
|
|
return xerrors.Errorf("deleting key '%s': %w", name, types.ErrKeyInfoNotFound)
|
|
}
|
|
delete(lmem.mem.keystore, name)
|
|
return nil
|
|
}
|