Snake a context through the Chain-blockstore creation

This commit is contained in:
Peter Rabbitson 2020-12-30 10:04:00 +00:00
parent bb5a92e2f4
commit 9334e73396
10 changed files with 25 additions and 17 deletions

View File

@ -139,7 +139,7 @@ func NewGeneratorWithSectors(numSectors int) (*ChainGen, error) {
return nil, xerrors.Errorf("failed to get metadata datastore: %w", err) return nil, xerrors.Errorf("failed to get metadata datastore: %w", err)
} }
bs, err := lr.Blockstore(repo.BlockstoreChain) bs, err := lr.Blockstore(context.TODO(), repo.BlockstoreChain)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -175,7 +175,7 @@ var chainBalanceStateCmd = &cli.Command{
defer lkrepo.Close() //nolint:errcheck defer lkrepo.Close() //nolint:errcheck
bs, err := lkrepo.Blockstore(repo.BlockstoreChain) bs, err := lkrepo.Blockstore(ctx, repo.BlockstoreChain)
if err != nil { if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err) return fmt.Errorf("failed to open blockstore: %w", err)
} }
@ -396,7 +396,7 @@ var chainPledgeCmd = &cli.Command{
defer lkrepo.Close() //nolint:errcheck defer lkrepo.Close() //nolint:errcheck
bs, err := lkrepo.Blockstore(repo.BlockstoreChain) bs, err := lkrepo.Blockstore(ctx, repo.BlockstoreChain)
if err != nil { if err != nil {
return xerrors.Errorf("failed to open blockstore: %w", err) return xerrors.Errorf("failed to open blockstore: %w", err)
} }

View File

@ -72,7 +72,7 @@ var exportChainCmd = &cli.Command{
defer fi.Close() //nolint:errcheck defer fi.Close() //nolint:errcheck
bs, err := lr.Blockstore(repo.BlockstoreChain) bs, err := lr.Blockstore(ctx, repo.BlockstoreChain)
if err != nil { if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err) return fmt.Errorf("failed to open blockstore: %w", err)
} }

View File

@ -1,6 +1,7 @@
package main package main
import ( import (
"context"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"io" "io"
@ -24,6 +25,8 @@ var importCarCmd = &cli.Command{
return xerrors.Errorf("opening fs repo: %w", err) return xerrors.Errorf("opening fs repo: %w", err)
} }
ctx := context.TODO()
exists, err := r.Exists() exists, err := r.Exists()
if err != nil { if err != nil {
return err return err
@ -44,7 +47,7 @@ var importCarCmd = &cli.Command{
return xerrors.Errorf("opening the car file: %w", err) return xerrors.Errorf("opening the car file: %w", err)
} }
bs, err := lr.Blockstore(repo.BlockstoreChain) bs, err := lr.Blockstore(ctx, repo.BlockstoreChain)
if err != nil { if err != nil {
return err return err
} }
@ -99,6 +102,8 @@ var importObjectCmd = &cli.Command{
return xerrors.Errorf("opening fs repo: %w", err) return xerrors.Errorf("opening fs repo: %w", err)
} }
ctx := context.TODO()
exists, err := r.Exists() exists, err := r.Exists()
if err != nil { if err != nil {
return err return err
@ -113,7 +118,7 @@ var importObjectCmd = &cli.Command{
} }
defer lr.Close() //nolint:errcheck defer lr.Close() //nolint:errcheck
bs, err := lr.Blockstore(repo.BlockstoreChain) bs, err := lr.Blockstore(ctx, repo.BlockstoreChain)
if err != nil { if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err) return fmt.Errorf("failed to open blockstore: %w", err)
} }

View File

@ -131,7 +131,7 @@ var stateTreePruneCmd = &cli.Command{
defer lkrepo.Close() //nolint:errcheck defer lkrepo.Close() //nolint:errcheck
bs, err := lkrepo.Blockstore(repo.BlockstoreChain) bs, err := lkrepo.Blockstore(ctx, repo.BlockstoreChain)
if err != nil { if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err) return fmt.Errorf("failed to open blockstore: %w", err)
} }

View File

@ -235,7 +235,7 @@ var DaemonCmd = &cli.Command{
issnapshot = true issnapshot = true
} }
if err := ImportChain(r, chainfile, issnapshot); err != nil { if err := ImportChain(ctx, r, chainfile, issnapshot); err != nil {
return err return err
} }
if cctx.Bool("halt-after-import") { if cctx.Bool("halt-after-import") {
@ -370,7 +370,7 @@ func importKey(ctx context.Context, api api.FullNode, f string) error {
return nil return nil
} }
func ImportChain(r repo.Repo, fname string, snapshot bool) (err error) { func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool) (err error) {
var rd io.Reader var rd io.Reader
var l int64 var l int64
if strings.HasPrefix(fname, "http://") || strings.HasPrefix(fname, "https://") { if strings.HasPrefix(fname, "http://") || strings.HasPrefix(fname, "https://") {
@ -413,7 +413,7 @@ func ImportChain(r repo.Repo, fname string, snapshot bool) (err error) {
} }
defer lr.Close() //nolint:errcheck defer lr.Close() //nolint:errcheck
bs, err := lr.Blockstore(repo.BlockstoreChain) bs, err := lr.Blockstore(ctx, repo.BlockstoreChain)
if err != nil { if err != nil {
return xerrors.Errorf("failed to open blockstore: %w", err) return xerrors.Errorf("failed to open blockstore: %w", err)
} }
@ -454,7 +454,7 @@ func ImportChain(r repo.Repo, fname string, snapshot bool) (err error) {
return xerrors.Errorf("flushing validation cache failed: %w", err) return xerrors.Errorf("flushing validation cache failed: %w", err)
} }
gb, err := cst.GetTipsetByHeight(context.TODO(), 0, ts, true) gb, err := cst.GetTipsetByHeight(ctx, 0, ts, true)
if err != nil { if err != nil {
return err return err
} }
@ -468,13 +468,13 @@ func ImportChain(r repo.Repo, fname string, snapshot bool) (err error) {
if !snapshot { if !snapshot {
log.Infof("validating imported chain...") log.Infof("validating imported chain...")
if err := stm.ValidateChain(context.TODO(), ts); err != nil { if err := stm.ValidateChain(ctx, ts); err != nil {
return xerrors.Errorf("chain validation failed: %w", err) return xerrors.Errorf("chain validation failed: %w", err)
} }
} }
log.Infof("accepting %s as new head", ts.Cids()) log.Infof("accepting %s as new head", ts.Cids())
if err := cst.ForceHeadSilent(context.Background(), ts); err != nil { if err := cst.ForceHeadSilent(ctx, ts); err != nil {
return err return err
} }

View File

@ -77,7 +77,7 @@ func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds
} }
func ChainRawBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.ChainRawBlockstore, error) { func ChainRawBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.ChainRawBlockstore, error) {
bs, err := r.Blockstore(repo.BlockstoreChain) bs, err := r.Blockstore(helpers.LifecycleCtx(mctx, lc), repo.BlockstoreChain)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -2,6 +2,7 @@ package repo
import ( import (
"bytes" "bytes"
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
@ -299,7 +300,7 @@ func (fsr *fsLockedRepo) Close() error {
} }
// Blockstore returns a blockstore for the provided data domain. // Blockstore returns a blockstore for the provided data domain.
func (fsr *fsLockedRepo) Blockstore(domain BlockstoreDomain) (blockstore.Blockstore, error) { func (fsr *fsLockedRepo) Blockstore(ctx context.Context, domain BlockstoreDomain) (blockstore.Blockstore, error) {
if domain != BlockstoreChain { if domain != BlockstoreChain {
return nil, ErrInvalidBlockstoreDomain return nil, ErrInvalidBlockstoreDomain
} }

View File

@ -1,6 +1,7 @@
package repo package repo
import ( import (
"context"
"errors" "errors"
"github.com/filecoin-project/lotus/lib/blockstore" "github.com/filecoin-project/lotus/lib/blockstore"
@ -54,7 +55,7 @@ type LockedRepo interface {
Datastore(namespace string) (datastore.Batching, error) Datastore(namespace string) (datastore.Batching, error)
// Blockstore returns an IPLD blockstore for the requested domain. // Blockstore returns an IPLD blockstore for the requested domain.
Blockstore(domain BlockstoreDomain) (blockstore.Blockstore, error) Blockstore(ctx context.Context, domain BlockstoreDomain) (blockstore.Blockstore, error)
// Returns config in this repo // Returns config in this repo
Config() (interface{}, error) Config() (interface{}, error)

View File

@ -1,6 +1,7 @@
package repo package repo
import ( import (
"context"
"encoding/json" "encoding/json"
"io/ioutil" "io/ioutil"
"os" "os"
@ -244,7 +245,7 @@ func (lmem *lockedMemRepo) Datastore(ns string) (datastore.Batching, error) {
return namespace.Wrap(lmem.mem.datastore, datastore.NewKey(ns)), nil return namespace.Wrap(lmem.mem.datastore, datastore.NewKey(ns)), nil
} }
func (lmem *lockedMemRepo) Blockstore(domain BlockstoreDomain) (blockstore.Blockstore, error) { func (lmem *lockedMemRepo) Blockstore(ctx context.Context, domain BlockstoreDomain) (blockstore.Blockstore, error) {
if domain != BlockstoreChain { if domain != BlockstoreChain {
return nil, ErrInvalidBlockstoreDomain return nil, ErrInvalidBlockstoreDomain
} }