Merge pull request #4179 from filecoin-project/fix/export-unclean-exit
chain export: Error with unfinished exports
This commit is contained in:
commit
405aba4d8f
@ -1070,13 +1070,20 @@ var chainExportCmd = &cli.Command{
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var last bool
|
||||||
for b := range stream {
|
for b := range stream {
|
||||||
|
last = len(b) == 0
|
||||||
|
|
||||||
_, err := fi.Write(b)
|
_, err := fi.Write(b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !last {
|
||||||
|
return xerrors.Errorf("incomplete export (remote connection lost?)")
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -507,15 +507,11 @@ func (a *ChainAPI) ChainExport(ctx context.Context, nroots abi.ChainEpoch, skipo
|
|||||||
r, w := io.Pipe()
|
r, w := io.Pipe()
|
||||||
out := make(chan []byte)
|
out := make(chan []byte)
|
||||||
go func() {
|
go func() {
|
||||||
defer w.Close() //nolint:errcheck // it is a pipe
|
|
||||||
|
|
||||||
bw := bufio.NewWriterSize(w, 1<<20)
|
bw := bufio.NewWriterSize(w, 1<<20)
|
||||||
defer bw.Flush() //nolint:errcheck // it is a write to a pipe
|
|
||||||
|
|
||||||
if err := a.Chain.Export(ctx, ts, nroots, skipoldmsgs, bw); err != nil {
|
err := a.Chain.Export(ctx, ts, nroots, skipoldmsgs, bw)
|
||||||
log.Errorf("chain export call failed: %s", err)
|
bw.Flush() //nolint:errcheck // it is a write to a pipe
|
||||||
return
|
w.CloseWithError(err) //nolint:errcheck // it is a pipe
|
||||||
}
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
@ -527,13 +523,23 @@ func (a *ChainAPI) ChainExport(ctx context.Context, nroots abi.ChainEpoch, skipo
|
|||||||
log.Errorf("chain export pipe read failed: %s", err)
|
log.Errorf("chain export pipe read failed: %s", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
select {
|
if n > 0 {
|
||||||
case out <- buf[:n]:
|
select {
|
||||||
case <-ctx.Done():
|
case out <- buf[:n]:
|
||||||
log.Warnf("export writer failed: %s", ctx.Err())
|
case <-ctx.Done():
|
||||||
return
|
log.Warnf("export writer failed: %s", ctx.Err())
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
|
// send empty slice to indicate correct eof
|
||||||
|
select {
|
||||||
|
case out <- []byte{}:
|
||||||
|
case <-ctx.Done():
|
||||||
|
log.Warnf("export writer failed: %s", ctx.Err())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user