lotus/node/repo/fsrepo.go

457 lines
10 KiB
Go
Raw Normal View History

package repo
import (
"encoding/json"
"fmt"
"github.com/filecoin-project/sector-storage/stores"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
2019-07-10 17:09:57 +00:00
"sync"
"github.com/ipfs/go-datastore"
2019-07-10 17:09:57 +00:00
"github.com/ipfs/go-datastore/namespace"
badger "github.com/ipfs/go-ds-badger2"
fslock "github.com/ipfs/go-fs-lock"
logging "github.com/ipfs/go-log/v2"
2019-07-10 17:09:57 +00:00
"github.com/mitchellh/go-homedir"
"github.com/multiformats/go-base32"
"github.com/multiformats/go-multiaddr"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/config"
)
const (
2020-03-05 19:21:06 +00:00
fsAPI = "api"
fsAPIToken = "token"
fsConfig = "config.toml"
2020-03-03 22:19:22 +00:00
fsStorageConfig = "storage.json"
2020-03-05 19:21:06 +00:00
fsDatastore = "datastore"
fsLock = "repo.lock"
fsKeystore = "keystore"
)
type RepoType int
const (
2019-11-12 17:59:38 +00:00
_ = iota // Default is invalid
FullNode RepoType = iota
StorageMiner
2020-03-13 01:37:38 +00:00
Worker
)
func defConfForType(t RepoType) interface{} {
switch t {
2019-11-12 17:59:38 +00:00
case FullNode:
return config.DefaultFullNode()
2019-11-12 17:59:38 +00:00
case StorageMiner:
return config.DefaultStorageMiner()
2020-03-13 01:37:38 +00:00
case Worker:
2020-03-13 11:59:19 +00:00
return &struct{}{}
default:
panic(fmt.Sprintf("unknown RepoType(%d)", int(t)))
}
}
2019-07-10 17:09:57 +00:00
var log = logging.Logger("repo")
2019-07-26 11:45:25 +00:00
var ErrRepoExists = xerrors.New("repo exists")
2019-07-10 17:09:57 +00:00
// FsRepo is struct for repo, use NewFS to create
type FsRepo struct {
path string
repoType RepoType
}
var _ Repo = &FsRepo{}
// NewFS creates a repo instance based on a path on file system
func NewFS(path string) (*FsRepo, error) {
2019-07-10 17:09:57 +00:00
path, err := homedir.Expand(path)
if err != nil {
2019-07-10 17:36:17 +00:00
return nil, err
2019-07-10 17:09:57 +00:00
}
return &FsRepo{
path: path,
}, nil
}
2019-07-23 21:54:54 +00:00
func (fsr *FsRepo) Exists() (bool, error) {
_, err := os.Stat(filepath.Join(fsr.path, fsDatastore))
2019-07-23 21:54:54 +00:00
notexist := os.IsNotExist(err)
if notexist {
err = nil
}
return !notexist, err
2019-07-19 09:23:24 +00:00
}
func (fsr *FsRepo) Init(t RepoType) error {
2019-07-30 23:18:21 +00:00
exist, err := fsr.Exists()
2020-03-05 19:21:06 +00:00
if err != nil {
2019-07-10 17:09:57 +00:00
return err
}
2019-07-30 23:18:21 +00:00
if exist {
return nil
}
2019-07-10 17:09:57 +00:00
log.Infof("Initializing repo at '%s'", fsr.path)
2019-07-30 23:18:21 +00:00
err = os.Mkdir(fsr.path, 0755) //nolint: gosec
if err != nil && !os.IsExist(err) {
return err
}
if err := fsr.initConfig(t); err != nil {
return xerrors.Errorf("init config: %w", err)
}
return fsr.initKeystore()
}
func (fsr *FsRepo) initConfig(t RepoType) error {
cfgP := filepath.Join(fsr.path, fsConfig)
_, err := os.Stat(cfgP)
if err == nil {
// exists
return nil
} else if !os.IsNotExist(err) {
return err
}
c, err := os.Create(cfgP)
2019-07-24 11:20:00 +00:00
if err != nil {
return err
}
comm, err := config.ConfigComment(defConfForType(t))
if err != nil {
return xerrors.Errorf("comment: %w", err)
}
_, err = c.Write(comm)
if err != nil {
return xerrors.Errorf("write config: %w", err)
}
2019-07-10 17:09:57 +00:00
if err := c.Close(); err != nil {
return xerrors.Errorf("close config: %w", err)
}
return nil
}
func (fsr *FsRepo) initKeystore() error {
kstorePath := filepath.Join(fsr.path, fsKeystore)
if _, err := os.Stat(kstorePath); err == nil {
return ErrRepoExists
} else if !os.IsNotExist(err) {
return err
}
return os.Mkdir(kstorePath, 0700)
2019-07-10 17:09:57 +00:00
}
// APIEndpoint returns endpoint of API in this repo
func (fsr *FsRepo) APIEndpoint() (multiaddr.Multiaddr, error) {
p := filepath.Join(fsr.path, fsAPI)
2019-07-26 03:26:29 +00:00
f, err := os.Open(p)
if os.IsNotExist(err) {
return nil, ErrNoAPIEndpoint
} else if err != nil {
return nil, err
}
defer f.Close() //nolint: errcheck // Read only op
data, err := ioutil.ReadAll(f)
if err != nil {
2019-07-26 11:45:25 +00:00
return nil, xerrors.Errorf("failed to read %q: %w", p, err)
}
strma := string(data)
strma = strings.TrimSpace(strma)
apima, err := multiaddr.NewMultiaddr(strma)
if err != nil {
return nil, err
}
return apima, nil
}
2019-07-23 18:49:09 +00:00
func (fsr *FsRepo) APIToken() ([]byte, error) {
p := filepath.Join(fsr.path, fsAPIToken)
f, err := os.Open(p)
if os.IsNotExist(err) {
return nil, ErrNoAPIEndpoint
} else if err != nil {
return nil, err
}
defer f.Close() //nolint: errcheck // Read only op
return ioutil.ReadAll(f)
}
// Lock acquires exclusive lock on this repo
func (fsr *FsRepo) Lock(repoType RepoType) (LockedRepo, error) {
locked, err := fslock.Locked(fsr.path, fsLock)
if err != nil {
return nil, xerrors.Errorf("could not check lock status: %w", err)
}
if locked {
return nil, ErrRepoAlreadyLocked
}
closer, err := fslock.Lock(fsr.path, fsLock)
if err != nil {
return nil, xerrors.Errorf("could not lock the repo: %w", err)
}
return &fsLockedRepo{
path: fsr.path,
repoType: repoType,
closer: closer,
}, nil
}
type fsLockedRepo struct {
path string
repoType RepoType
closer io.Closer
2019-07-10 17:09:57 +00:00
2019-07-10 17:28:49 +00:00
ds datastore.Batching
dsErr error
2019-07-10 17:09:57 +00:00
dsOnce sync.Once
2020-03-09 19:22:30 +00:00
storageLk sync.Mutex
}
2019-07-12 09:59:18 +00:00
func (fsr *fsLockedRepo) Path() string {
return fsr.path
}
func (fsr *fsLockedRepo) Close() error {
err := os.Remove(fsr.join(fsAPI))
if err != nil && !os.IsNotExist(err) {
return xerrors.Errorf("could not remove API file: %w", err)
}
2019-09-17 14:34:22 +00:00
if fsr.ds != nil {
if err := fsr.ds.Close(); err != nil {
return xerrors.Errorf("could not close datastore: %w", err)
}
}
err = fsr.closer.Close()
fsr.closer = nil
return err
}
// join joins path elements with fsr.path
func (fsr *fsLockedRepo) join(paths ...string) string {
return filepath.Join(append([]string{fsr.path}, paths...)...)
}
func (fsr *fsLockedRepo) stillValid() error {
if fsr.closer == nil {
return ErrClosedRepo
}
return nil
}
2019-07-10 17:09:57 +00:00
func (fsr *fsLockedRepo) Datastore(ns string) (datastore.Batching, error) {
fsr.dsOnce.Do(func() {
2019-10-29 18:04:35 +00:00
opts := badger.DefaultOptions
opts.Truncate = true
fsr.ds, fsr.dsErr = badger.NewDatastore(fsr.join(fsDatastore), &opts)
2019-07-12 10:17:44 +00:00
/*if fsr.dsErr == nil {
fsr.ds = datastore.NewLogDatastore(fsr.ds, "fsrepo")
}*/
2019-07-10 17:09:57 +00:00
})
if fsr.dsErr != nil {
return nil, fsr.dsErr
}
return namespace.Wrap(fsr.ds, datastore.NewKey(ns)), nil
}
func (fsr *fsLockedRepo) Config() (interface{}, error) {
if err := fsr.stillValid(); err != nil {
return nil, err
}
return config.FromFile(fsr.join(fsConfig), defConfForType(fsr.repoType))
}
func (fsr *fsLockedRepo) GetStorage() (stores.StorageConfig, error) {
2020-03-09 19:22:30 +00:00
fsr.storageLk.Lock()
defer fsr.storageLk.Unlock()
2020-03-09 21:46:17 +00:00
return fsr.getStorage(nil)
2020-03-09 19:22:30 +00:00
}
func (fsr *fsLockedRepo) getStorage(def *stores.StorageConfig) (stores.StorageConfig, error) {
2020-03-09 21:46:17 +00:00
c, err := config.StorageFromFile(fsr.join(fsStorageConfig), def)
2020-03-03 22:19:22 +00:00
if err != nil {
return stores.StorageConfig{}, err
2020-03-03 22:19:22 +00:00
}
return *c, nil
}
func (fsr *fsLockedRepo) SetStorage(c func(*stores.StorageConfig)) error {
2020-03-09 19:22:30 +00:00
fsr.storageLk.Lock()
defer fsr.storageLk.Unlock()
sc, err := fsr.getStorage(&stores.StorageConfig{})
2020-03-09 19:22:30 +00:00
if err != nil {
return xerrors.Errorf("get storage: %w", err)
}
c(&sc)
return config.WriteStorageFile(fsr.join(fsStorageConfig), sc)
2020-03-03 22:19:22 +00:00
}
func (fsr *fsLockedRepo) SetAPIEndpoint(ma multiaddr.Multiaddr) error {
if err := fsr.stillValid(); err != nil {
return err
}
2019-07-11 11:52:07 +00:00
return ioutil.WriteFile(fsr.join(fsAPI), []byte(ma.String()), 0644)
}
2019-07-23 18:49:09 +00:00
func (fsr *fsLockedRepo) SetAPIToken(token []byte) error {
if err := fsr.stillValid(); err != nil {
return err
}
return ioutil.WriteFile(fsr.join(fsAPIToken), token, 0600)
}
func (fsr *fsLockedRepo) KeyStore() (types.KeyStore, error) {
if err := fsr.stillValid(); err != nil {
return nil, err
}
return fsr, nil
}
var kstrPermissionMsg = "permissions of key: '%s' are too relaxed, " +
"required: 0600, got: %#o"
// List lists all the keys stored in the KeyStore
func (fsr *fsLockedRepo) List() ([]string, error) {
if err := fsr.stillValid(); err != nil {
return nil, err
}
kstorePath := fsr.join(fsKeystore)
dir, err := os.Open(kstorePath)
if err != nil {
return nil, xerrors.Errorf("opening dir to list keystore: %w", err)
}
files, err := dir.Readdir(-1)
if err != nil {
return nil, xerrors.Errorf("reading keystore dir: %w", err)
}
keys := make([]string, 0, len(files))
for _, f := range files {
if f.Mode()&0077 != 0 {
return nil, xerrors.Errorf(kstrPermissionMsg, f.Name(), f.Mode())
}
name, err := base32.RawStdEncoding.DecodeString(f.Name())
if err != nil {
return nil, xerrors.Errorf("decoding key: '%s': %w", f.Name(), err)
}
keys = append(keys, string(name))
}
return keys, nil
}
// Get gets a key out of keystore and returns types.KeyInfo coresponding to named key
func (fsr *fsLockedRepo) Get(name string) (types.KeyInfo, error) {
if err := fsr.stillValid(); err != nil {
return types.KeyInfo{}, err
}
encName := base32.RawStdEncoding.EncodeToString([]byte(name))
keyPath := fsr.join(fsKeystore, encName)
fstat, err := os.Stat(keyPath)
if os.IsNotExist(err) {
return types.KeyInfo{}, xerrors.Errorf("opening key '%s': %w", name, types.ErrKeyInfoNotFound)
} else if err != nil {
return types.KeyInfo{}, xerrors.Errorf("opening key '%s': %w", name, err)
}
if fstat.Mode()&0077 != 0 {
return types.KeyInfo{}, xerrors.Errorf(kstrPermissionMsg, name, fstat.Mode())
}
file, err := os.Open(keyPath)
if err != nil {
return types.KeyInfo{}, xerrors.Errorf("opening key '%s': %w", name, err)
}
defer file.Close() //nolint: errcheck // read only op
data, err := ioutil.ReadAll(file)
if err != nil {
return types.KeyInfo{}, xerrors.Errorf("reading key '%s': %w", name, err)
}
var res types.KeyInfo
err = json.Unmarshal(data, &res)
if err != nil {
return types.KeyInfo{}, xerrors.Errorf("decoding key '%s': %w", name, err)
}
return res, nil
}
// Put saves key info under given name
func (fsr *fsLockedRepo) Put(name string, info types.KeyInfo) error {
if err := fsr.stillValid(); err != nil {
return err
}
encName := base32.RawStdEncoding.EncodeToString([]byte(name))
keyPath := fsr.join(fsKeystore, encName)
_, err := os.Stat(keyPath)
if err == nil {
2019-10-18 11:39:31 +00:00
return xerrors.Errorf("checking key before put '%s': %w", name, types.ErrKeyExists)
} else if !os.IsNotExist(err) {
return xerrors.Errorf("checking key before put '%s': %w", name, err)
}
keyData, err := json.Marshal(info)
if err != nil {
return xerrors.Errorf("encoding key '%s': %w", name, err)
}
err = ioutil.WriteFile(keyPath, keyData, 0600)
if err != nil {
return xerrors.Errorf("writing key '%s': %w", name, err)
}
return nil
}
func (fsr *fsLockedRepo) Delete(name string) error {
if err := fsr.stillValid(); err != nil {
return err
}
encName := base32.RawStdEncoding.EncodeToString([]byte(name))
keyPath := fsr.join(fsKeystore, encName)
_, err := os.Stat(keyPath)
if os.IsNotExist(err) {
return xerrors.Errorf("checking key before delete '%s': %w", name, types.ErrKeyInfoNotFound)
} else if err != nil {
return xerrors.Errorf("checking key before delete '%s': %w", name, err)
}
err = os.Remove(keyPath)
if err != nil {
return xerrors.Errorf("deleting key '%s': %w", name, err)
}
return nil
}