package dagstore import ( "context" "errors" "os" "sync" "time" "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" levelds "github.com/ipfs/go-ds-leveldb" measure "github.com/ipfs/go-ds-measure" logging "github.com/ipfs/go-log/v2" ldbopts "github.com/syndtr/goleveldb/leveldb/opt" "golang.org/x/xerrors" "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/dagstore" "github.com/filecoin-project/dagstore/index" "github.com/filecoin-project/dagstore/mount" "github.com/filecoin-project/dagstore/shard" "github.com/filecoin-project/go-fil-markets/stores" ) const maxRecoverAttempts = 1 var log = logging.Logger("dagstore-wrapper") type Wrapper struct { ctx context.Context cancel context.CancelFunc backgroundWg sync.WaitGroup dagst dagstore.Interface mountApi MinerAPI failureCh chan dagstore.ShardResult traceCh chan dagstore.Trace gcInterval time.Duration } var _ stores.DAGStoreWrapper = (*Wrapper)(nil) func NewDAGStore(cfg config.DAGStoreConfig, mountApi MinerAPI) (*dagstore.DAGStore, *Wrapper, error) { // construct the DAG Store. registry := mount.NewRegistry() if err := registry.Register(lotusScheme, mountTemplate(mountApi)); err != nil { return nil, nil, xerrors.Errorf("failed to create registry: %w", err) } // The dagstore will write Shard failures to the `failureCh` here. failureCh := make(chan dagstore.ShardResult, 1) // The dagstore will write Trace events to the `traceCh` here. traceCh := make(chan dagstore.Trace, 32) dstore, err := newDatastore(cfg.DatastoreDir) if err != nil { return nil, nil, xerrors.Errorf("failed to create dagstore datastore in %s: %w", cfg.DatastoreDir, err) } irepo, err := index.NewFSRepo(cfg.IndexDir) if err != nil { return nil, nil, xerrors.Errorf("failed to initialise dagstore index repo") } dcfg := dagstore.Config{ TransientsDir: cfg.TransientsDir, IndexRepo: irepo, Datastore: dstore, MountRegistry: registry, FailureCh: failureCh, TraceCh: traceCh, // not limiting fetches globally, as the Lotus mount does // conditional throttling. MaxConcurrentIndex: cfg.MaxConcurrentIndex, MaxConcurrentReadyFetches: cfg.MaxConcurrentReadyFetches, RecoverOnStart: dagstore.RecoverOnAcquire, } dagst, err := dagstore.NewDAGStore(dcfg) if err != nil { return nil, nil, xerrors.Errorf("failed to create DAG store: %w", err) } w := &Wrapper{ dagst: dagst, mountApi: mountApi, failureCh: failureCh, traceCh: traceCh, gcInterval: cfg.GCInterval, } return dagst, w, nil } // newDatastore creates a datastore under the given base directory // for dagstore metadata. func newDatastore(dir string) (ds.Batching, error) { // Create the datastore directory if it doesn't exist yet. if err := os.MkdirAll(dir, 0755); err != nil { return nil, xerrors.Errorf("failed to create directory %s for DAG store datastore: %w", dir, err) } // Create a new LevelDB datastore dstore, err := levelds.NewDatastore(dir, &levelds.Options{ Compression: ldbopts.NoCompression, NoSync: false, Strict: ldbopts.StrictAll, ReadOnly: false, }) if err != nil { return nil, xerrors.Errorf("failed to open datastore for DAG store: %w", err) } // Keep statistics about the datastore mds := measure.New("measure.", dstore) return mds, nil } func (w *Wrapper) Start(ctx context.Context) error { w.ctx, w.cancel = context.WithCancel(ctx) // Run a go-routine to do DagStore GC. w.backgroundWg.Add(1) go w.gcLoop() // run a go-routine to read the trace for debugging. w.backgroundWg.Add(1) go w.traceLoop() // Run a go-routine for shard recovery if dss, ok := w.dagst.(*dagstore.DAGStore); ok { w.backgroundWg.Add(1) go dagstore.RecoverImmediately(w.ctx, dss, w.failureCh, maxRecoverAttempts, w.backgroundWg.Done) } return w.dagst.Start(ctx) } func (w *Wrapper) traceLoop() { defer w.backgroundWg.Done() for w.ctx.Err() == nil { select { // Log trace events from the DAG store case tr := <-w.traceCh: log.Debugw("trace", "shard-key", tr.Key.String(), "op-type", tr.Op.String(), "after", tr.After.String()) case <-w.ctx.Done(): return } } } func (w *Wrapper) gcLoop() { defer w.backgroundWg.Done() ticker := time.NewTicker(w.gcInterval) defer ticker.Stop() for w.ctx.Err() == nil { select { // GC the DAG store on every tick case <-ticker.C: _, _ = w.dagst.GC(w.ctx) // Exit when the DAG store wrapper is shutdown case <-w.ctx.Done(): return } } } func (w *Wrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (stores.ClosableBlockstore, error) { log.Debugf("acquiring shard for piece CID %s", pieceCid) key := shard.KeyFromCID(pieceCid) resch := make(chan dagstore.ShardResult, 1) err := w.dagst.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{}) log.Debugf("sent message to acquire shard for piece CID %s", pieceCid) if err != nil { if !errors.Is(err, dagstore.ErrShardUnknown) { return nil, xerrors.Errorf("failed to schedule acquire shard for piece CID %s: %w", pieceCid, err) } // if the DAGStore does not know about the Shard -> register it and then try to acquire it again. log.Warnw("failed to load shard as shard is not registered, will re-register", "pieceCID", pieceCid) // The path of a transient file that we can ask the DAG Store to use // to perform the Indexing rather than fetching it via the Mount if // we already have a transient file. However, we don't have it here // and therefore we pass an empty file path. carPath := "" if err := stores.RegisterShardSync(ctx, w, pieceCid, carPath, false); err != nil { return nil, xerrors.Errorf("failed to re-register shard during loading piece CID %s: %w", pieceCid, err) } log.Warnw("successfully re-registered shard", "pieceCID", pieceCid) resch = make(chan dagstore.ShardResult, 1) if err := w.dagst.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{}); err != nil { return nil, xerrors.Errorf("failed to acquire Shard for piece CID %s after re-registering: %w", pieceCid, err) } } // TODO: The context is not yet being actively monitored by the DAG store, // so we need to select against ctx.Done() until the following issue is // implemented: // https://github.com/filecoin-project/dagstore/issues/39 var res dagstore.ShardResult select { case <-ctx.Done(): return nil, ctx.Err() case res = <-resch: if res.Error != nil { return nil, xerrors.Errorf("failed to acquire shard for piece CID %s: %w", pieceCid, res.Error) } } bs, err := res.Accessor.Blockstore() if err != nil { return nil, err } log.Debugf("successfully loaded blockstore for piece CID %s", pieceCid) return &Blockstore{ReadBlockstore: bs, Closer: res.Accessor}, nil } func (w *Wrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath string, eagerInit bool, resch chan dagstore.ShardResult) error { // Create a lotus mount with the piece CID key := shard.KeyFromCID(pieceCid) mt, err := NewLotusMount(pieceCid, w.mountApi) if err != nil { return xerrors.Errorf("failed to create lotus mount for piece CID %s: %w", pieceCid, err) } // Register the shard opts := dagstore.RegisterOpts{ ExistingTransient: carPath, LazyInitialization: !eagerInit, } err = w.dagst.RegisterShard(ctx, key, mt, resch, opts) if err != nil { return xerrors.Errorf("failed to schedule register shard for piece CID %s: %w", pieceCid, err) } log.Debugf("successfully submitted Register Shard request for piece CID %s with eagerInit=%t", pieceCid, eagerInit) return nil } func (w *Wrapper) Close() error { // Cancel the context w.cancel() // Close the DAG store log.Info("will close the dagstore") if err := w.dagst.Close(); err != nil { return xerrors.Errorf("failed to close DAG store: %w", err) } log.Info("dagstore closed") // Wait for the background go routine to exit log.Info("waiting for dagstore background wrapper routines to exist") w.backgroundWg.Wait() log.Info("exited dagstore background warpper routines") return nil }