Ranged export: remove non-internal ranged export method
Additionally, per feedback: * Set "admin" permissions * Remove from v0api
This commit is contained in:
parent
1bb698619c
commit
ec01036871
@ -173,9 +173,15 @@ type FullNode interface {
|
||||
// If oldmsgskip is set, messages from before the requested roots are also not included.
|
||||
ChainExport(ctx context.Context, nroots abi.ChainEpoch, oldmsgskip bool, tsk types.TipSetKey) (<-chan []byte, error) //perm:read
|
||||
|
||||
ChainExportRange(ctx context.Context, head, tail types.TipSetKey, cfg ChainExportConfig) (<-chan []byte, error) //perm:read
|
||||
|
||||
ChainExportRangeInternal(ctx context.Context, head, tail types.TipSetKey, cfg ChainExportConfig) error //perm:read
|
||||
// ChainExportRangeInternal triggers the export of a chain
|
||||
// CAR-snapshot directly to disk. It is similar to ChainExport,
|
||||
// except, depending on options, the snapshot can include receipts,
|
||||
// messages and stateroots for the length between the specified head
|
||||
// and tail, thus producing "archival-grade" snapshots that include
|
||||
// all the on-chain data. The header chain is included back to
|
||||
// genesis and these snapshots can be used to initialize Filecoin
|
||||
// nodes.
|
||||
ChainExportRangeInternal(ctx context.Context, head, tail types.TipSetKey, cfg ChainExportConfig) error //perm:admin
|
||||
|
||||
// ChainPrune prunes the stored chain state and garbage collects; only supported if you
|
||||
// are using the splitstore
|
||||
|
@ -138,9 +138,7 @@ type FullNodeMethods struct {
|
||||
|
||||
ChainExport func(p0 context.Context, p1 abi.ChainEpoch, p2 bool, p3 types.TipSetKey) (<-chan []byte, error) `perm:"read"`
|
||||
|
||||
ChainExportRange func(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 ChainExportConfig) (<-chan []byte, error) `perm:"read"`
|
||||
|
||||
ChainExportRangeInternal func(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 ChainExportConfig) error `perm:"read"`
|
||||
ChainExportRangeInternal func(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 ChainExportConfig) error `perm:"admin"`
|
||||
|
||||
ChainGetBlock func(p0 context.Context, p1 cid.Cid) (*types.BlockHeader, error) `perm:"read"`
|
||||
|
||||
@ -1437,17 +1435,6 @@ func (s *FullNodeStub) ChainExport(p0 context.Context, p1 abi.ChainEpoch, p2 boo
|
||||
return nil, ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *FullNodeStruct) ChainExportRange(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 ChainExportConfig) (<-chan []byte, error) {
|
||||
if s.Internal.ChainExportRange == nil {
|
||||
return nil, ErrNotSupported
|
||||
}
|
||||
return s.Internal.ChainExportRange(p0, p1, p2, p3)
|
||||
}
|
||||
|
||||
func (s *FullNodeStub) ChainExportRange(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 ChainExportConfig) (<-chan []byte, error) {
|
||||
return nil, ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *FullNodeStruct) ChainExportRangeInternal(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 ChainExportConfig) error {
|
||||
if s.Internal.ChainExportRangeInternal == nil {
|
||||
return ErrNotSupported
|
||||
|
@ -161,10 +161,6 @@ type FullNode interface {
|
||||
// If oldmsgskip is set, messages from before the requested roots are also not included.
|
||||
ChainExport(ctx context.Context, nroots abi.ChainEpoch, oldmsgskip bool, tsk types.TipSetKey) (<-chan []byte, error) //perm:read
|
||||
|
||||
ChainExportRange(ctx context.Context, head, tail types.TipSetKey, cfg api.ChainExportConfig) (<-chan []byte, error) //perm:read
|
||||
|
||||
ChainExportRangeInternal(ctx context.Context, head, tail types.TipSetKey, cfg api.ChainExportConfig) error //perm:read
|
||||
|
||||
// MethodGroup: Beacon
|
||||
// The Beacon method group contains methods for interacting with the random beacon (DRAND)
|
||||
|
||||
|
@ -47,10 +47,6 @@ type FullNodeMethods struct {
|
||||
|
||||
ChainExport func(p0 context.Context, p1 abi.ChainEpoch, p2 bool, p3 types.TipSetKey) (<-chan []byte, error) `perm:"read"`
|
||||
|
||||
ChainExportRange func(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 api.ChainExportConfig) (<-chan []byte, error) `perm:"read"`
|
||||
|
||||
ChainExportRangeInternal func(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 api.ChainExportConfig) error `perm:"read"`
|
||||
|
||||
ChainGetBlock func(p0 context.Context, p1 cid.Cid) (*types.BlockHeader, error) `perm:"read"`
|
||||
|
||||
ChainGetBlockMessages func(p0 context.Context, p1 cid.Cid) (*api.BlockMessages, error) `perm:"read"`
|
||||
@ -534,28 +530,6 @@ func (s *FullNodeStub) ChainExport(p0 context.Context, p1 abi.ChainEpoch, p2 boo
|
||||
return nil, ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *FullNodeStruct) ChainExportRange(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 api.ChainExportConfig) (<-chan []byte, error) {
|
||||
if s.Internal.ChainExportRange == nil {
|
||||
return nil, ErrNotSupported
|
||||
}
|
||||
return s.Internal.ChainExportRange(p0, p1, p2, p3)
|
||||
}
|
||||
|
||||
func (s *FullNodeStub) ChainExportRange(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 api.ChainExportConfig) (<-chan []byte, error) {
|
||||
return nil, ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *FullNodeStruct) ChainExportRangeInternal(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 api.ChainExportConfig) error {
|
||||
if s.Internal.ChainExportRangeInternal == nil {
|
||||
return ErrNotSupported
|
||||
}
|
||||
return s.Internal.ChainExportRangeInternal(p0, p1, p2, p3)
|
||||
}
|
||||
|
||||
func (s *FullNodeStub) ChainExportRangeInternal(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 api.ChainExportConfig) error {
|
||||
return ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *FullNodeStruct) ChainGetBlock(p0 context.Context, p1 cid.Cid) (*types.BlockHeader, error) {
|
||||
if s.Internal.ChainGetBlock == nil {
|
||||
return nil, ErrNotSupported
|
||||
|
49
cli/chain.go
49
cli/chain.go
@ -1188,12 +1188,14 @@ var ChainExportRangeCmd = &cli.Command{
|
||||
Value: 1 << 20,
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "internal",
|
||||
Usage: "will cause the daemon to write the file locally",
|
||||
Name: "internal",
|
||||
Usage: "write the file locally to disk",
|
||||
Value: true,
|
||||
Hidden: true, // currently, non-internal export is not implemented.
|
||||
},
|
||||
},
|
||||
Action: func(cctx *cli.Context) error {
|
||||
api, closer, err := GetFullNodeAPI(cctx)
|
||||
api, closer, err := GetFullNodeAPIV1(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -1234,20 +1236,11 @@ var ChainExportRangeCmd = &cli.Command{
|
||||
return errors.New("Height of --head tipset must be greater or equal to the height of the --tail tipset")
|
||||
}
|
||||
|
||||
if cctx.Bool("internal") {
|
||||
if err := api.ChainExportRangeInternal(ctx, head.Key(), tail.Key(), lapi.ChainExportConfig{
|
||||
WriteBufferSize: cctx.Int("write-buffer"),
|
||||
NumWorkers: cctx.Int("workers"),
|
||||
IncludeMessages: cctx.Bool("messages"),
|
||||
IncludeReceipts: cctx.Bool("receipts"),
|
||||
IncludeStateRoots: cctx.Bool("stateroots"),
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
if !cctx.Bool("internal") {
|
||||
return errors.New("Non-internal exports are not implemented")
|
||||
}
|
||||
|
||||
stream, err := api.ChainExportRange(ctx, head.Key(), tail.Key(), lapi.ChainExportConfig{
|
||||
err = api.ChainExportRangeInternal(ctx, head.Key(), tail.Key(), lapi.ChainExportConfig{
|
||||
WriteBufferSize: cctx.Int("write-buffer"),
|
||||
NumWorkers: cctx.Int("workers"),
|
||||
IncludeMessages: cctx.Bool("messages"),
|
||||
@ -1257,32 +1250,6 @@ var ChainExportRangeCmd = &cli.Command{
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fi, err := createExportFile(cctx.App, cctx.Args().First())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
err := fi.Close()
|
||||
if err != nil {
|
||||
fmt.Printf("error closing output file: %+v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
var last bool
|
||||
for b := range stream {
|
||||
last = len(b) == 0
|
||||
|
||||
_, err := fi.Write(b)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if !last {
|
||||
return xerrors.Errorf("incomplete export (remote connection lost?)")
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
@ -635,69 +635,9 @@ func (a ChainAPI) ChainExportRangeInternal(ctx context.Context, head, tail types
|
||||
return fmt.Errorf("exporting chain range: %w", err)
|
||||
}
|
||||
|
||||
// FIXME: return progress.
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a ChainAPI) ChainExportRange(ctx context.Context, head, tail types.TipSetKey, cfg api.ChainExportConfig) (<-chan []byte, error) {
|
||||
headTs, err := a.Chain.GetTipSetFromKey(ctx, head)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("loading tipset %s: %w", head, err)
|
||||
}
|
||||
tailTs, err := a.Chain.GetTipSetFromKey(ctx, tail)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("loading tipset %s: %w", tail, err)
|
||||
}
|
||||
r, w := io.Pipe()
|
||||
out := make(chan []byte)
|
||||
go func() {
|
||||
bw := bufio.NewWriterSize(w, cfg.WriteBufferSize)
|
||||
err := a.Chain.ExportRange(
|
||||
ctx,
|
||||
bw,
|
||||
headTs,
|
||||
tailTs,
|
||||
cfg.IncludeMessages, cfg.IncludeReceipts, cfg.IncludeStateRoots,
|
||||
cfg.NumWorkers,
|
||||
)
|
||||
bw.Flush() //nolint:errcheck // it is a write to a pipe
|
||||
w.CloseWithError(err) //nolint:errcheck // it is a pipe
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer close(out)
|
||||
for {
|
||||
buf := make([]byte, cfg.WriteBufferSize)
|
||||
n, err := r.Read(buf)
|
||||
if err != nil && err != io.EOF {
|
||||
log.Errorf("chain export pipe read failed: %s", err)
|
||||
return
|
||||
}
|
||||
if n > 0 {
|
||||
select {
|
||||
case out <- buf[:n]:
|
||||
case <-ctx.Done():
|
||||
log.Warnf("export writer failed: %s", ctx.Err())
|
||||
return
|
||||
}
|
||||
}
|
||||
if err == io.EOF {
|
||||
// send empty slice to indicate correct eof
|
||||
select {
|
||||
case out <- []byte{}:
|
||||
case <-ctx.Done():
|
||||
log.Warnf("export writer failed: %s", ctx.Err())
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (a *ChainAPI) ChainExport(ctx context.Context, nroots abi.ChainEpoch, skipoldmsgs bool, tsk types.TipSetKey) (<-chan []byte, error) {
|
||||
ts, err := a.Chain.GetTipSetFromKey(ctx, tsk)
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user