lotus/node/repo/importmgr/mgr.go

173 lines
4.0 KiB
Go
Raw Normal View History

2020-07-06 23:39:30 +00:00
package importmgr
import (
"encoding/json"
"fmt"
2021-07-03 04:40:56 +00:00
"io/ioutil"
"strconv"
2020-07-07 08:52:19 +00:00
2021-07-03 07:25:20 +00:00
"github.com/filecoin-project/go-fil-markets/shared"
2021-07-03 08:31:32 +00:00
"github.com/ipfs/go-cid"
2021-07-03 04:40:56 +00:00
"github.com/ipfs/go-datastore/query"
2021-07-07 08:40:59 +00:00
logging "github.com/ipfs/go-log/v2"
2020-07-07 08:52:19 +00:00
"golang.org/x/xerrors"
2020-07-06 23:39:30 +00:00
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
)
2021-07-07 08:40:59 +00:00
var log = logging.Logger("importmgr")
2021-07-07 06:32:48 +00:00
type ImportID uint64
2020-07-06 23:39:30 +00:00
type Mgr struct {
2021-07-03 04:40:56 +00:00
ds datastore.Batching
repoPath string
2021-07-03 07:25:20 +00:00
counter *shared.TimeCounter
2020-07-06 23:39:30 +00:00
}
2020-07-07 08:52:19 +00:00
type Label string
2020-07-07 09:38:22 +00:00
2020-07-07 08:52:19 +00:00
const (
LSource = "source" // Function which created the import
LRootCid = "root" // Root CID
LFileName = "filename" // Local file path
LCARv2FilePath = "CARv2Path" // path of the CARv2 file.
2020-07-07 08:52:19 +00:00
)
2021-07-03 07:25:20 +00:00
func New(ds datastore.Batching, repoPath string) *Mgr {
2020-07-06 23:39:30 +00:00
return &Mgr{
2021-07-03 07:25:20 +00:00
repoPath: repoPath,
ds: datastore.NewLogDatastore(namespace.Wrap(ds, datastore.NewKey("/stores")), "storess"),
counter: shared.NewTimeCounter(),
2020-07-06 23:39:30 +00:00
}
}
2020-07-07 08:52:19 +00:00
type StoreMeta struct {
2020-07-06 23:39:30 +00:00
Labels map[string]string
}
2021-07-07 06:32:48 +00:00
func (m *Mgr) NewStore() (ImportID, error) {
2021-07-03 07:25:20 +00:00
id := m.counter.Next()
2020-07-06 23:39:30 +00:00
2020-07-07 08:52:19 +00:00
meta, err := json.Marshal(&StoreMeta{Labels: map[string]string{
2020-07-06 23:39:30 +00:00
"source": "unknown",
}})
if err != nil {
2021-07-03 07:25:20 +00:00
return 0, xerrors.Errorf("marshaling empty store metadata: %w", err)
2020-07-06 23:39:30 +00:00
}
err = m.ds.Put(datastore.NewKey(fmt.Sprintf("%d", id)), meta)
2021-07-07 06:32:48 +00:00
return ImportID(id), err
2020-07-06 23:39:30 +00:00
}
2021-07-07 06:32:48 +00:00
func (m *Mgr) AddLabel(id ImportID, key, value string) error { // source, file path, data CID..
2020-07-06 23:39:30 +00:00
meta, err := m.ds.Get(datastore.NewKey(fmt.Sprintf("%d", id)))
if err != nil {
return xerrors.Errorf("getting metadata form datastore: %w", err)
}
2020-07-07 08:52:19 +00:00
var sm StoreMeta
2020-07-06 23:39:30 +00:00
if err := json.Unmarshal(meta, &sm); err != nil {
return xerrors.Errorf("unmarshaling store meta: %w", err)
}
sm.Labels[key] = value
2020-07-07 09:38:09 +00:00
meta, err = json.Marshal(&sm)
2020-07-06 23:39:30 +00:00
if err != nil {
return xerrors.Errorf("marshaling store meta: %w", err)
}
return m.ds.Put(datastore.NewKey(fmt.Sprintf("%d", id)), meta)
}
2021-07-07 06:32:48 +00:00
func (m *Mgr) List() ([]ImportID, error) {
var keys []ImportID
2021-07-03 04:40:56 +00:00
qres, err := m.ds.Query(query.Query{KeysOnly: true})
if err != nil {
return nil, xerrors.Errorf("query error: %w", err)
}
defer qres.Close() //nolint:errcheck
for r := range qres.Next() {
k := r.Key
if string(k[0]) == "/" {
k = k[1:]
}
id, err := strconv.ParseUint(k, 10, 64)
if err != nil {
return nil, xerrors.Errorf("failed to parse key %s to uint64, err=%w", r.Key, err)
}
2021-07-07 06:32:48 +00:00
keys = append(keys, ImportID(id))
2021-07-03 04:40:56 +00:00
}
return keys, nil
2020-07-07 08:52:19 +00:00
}
2021-07-07 06:32:48 +00:00
func (m *Mgr) Info(id ImportID) (*StoreMeta, error) {
2020-07-07 08:52:19 +00:00
meta, err := m.ds.Get(datastore.NewKey(fmt.Sprintf("%d", id)))
if err != nil {
return nil, xerrors.Errorf("getting metadata form datastore: %w", err)
}
var sm StoreMeta
if err := json.Unmarshal(meta, &sm); err != nil {
return nil, xerrors.Errorf("unmarshaling store meta: %w", err)
}
return &sm, nil
}
2021-07-07 06:32:48 +00:00
func (m *Mgr) Remove(id ImportID) error {
2020-07-07 11:45:02 +00:00
if err := m.ds.Delete(datastore.NewKey(fmt.Sprintf("%d", id))); err != nil {
return xerrors.Errorf("removing import metadata: %w", err)
}
return nil
}
2021-07-03 04:40:56 +00:00
2021-07-03 08:31:32 +00:00
func (m *Mgr) CARV2FilePathFor(dagRoot cid.Cid) (string, error) {
importIDs, err := m.List()
if err != nil {
return "", xerrors.Errorf("failed to fetch import IDs: %w", err)
}
for _, importID := range importIDs {
info, err := m.Info(importID)
if err != nil {
2021-07-07 08:40:59 +00:00
log.Errorf("failed to fetch info, importID=%d: %s", importID, err)
2021-07-03 08:31:32 +00:00
continue
}
if info.Labels[LRootCid] == "" {
continue
}
c, err := cid.Parse(info.Labels[LRootCid])
if err != nil {
2021-07-07 08:40:59 +00:00
log.Errorf("failed to parse Root cid %s: %w", info.Labels[LRootCid], err)
2021-07-03 08:31:32 +00:00
continue
}
if c.Equals(dagRoot) {
return info.Labels[LCARv2FilePath], nil
}
}
return "", nil
}
2021-07-07 06:32:48 +00:00
func (m *Mgr) NewTempFile(importID ImportID) (string, error) {
file, err := ioutil.TempFile(m.repoPath, fmt.Sprintf("%d", importID))
2021-07-03 04:40:56 +00:00
if err != nil {
return "", xerrors.Errorf("failed to create temp file: %w", err)
}
// close the file as we need to return the path here.
if err := file.Close(); err != nil {
return "", xerrors.Errorf("failed to close temp file: %w", err)
}
return file.Name(), nil
}