core/rawdb: stop freezer process as part of freezer.Close() (#21010)
* core/rawdb: Stop freezer process as part of freezer.Close() When you call db.Close(), it was closing the leveldb database first, then closing the freezer, but never stopping the freezer process. This could cause the freezer to attempt to write to leveldb after leveldb had been closed, leading to a crash with a non-zero exit code. This change adds a quit channel to the freezer, and freezer.Close() will not return until the freezer process has stopped. Additionally, when you call freezerdb.Close(), it will close the AncientStore before closing leveldb, to ensure that the freezer goroutine will be stopped before leveldb is closed. * core/rawdb: Fix formatting for golint * core/rawdb: Use backoff flag to avoid repeating select * core/rawdb: Include accidentally omitted backoff
This commit is contained in:
parent
bd60295de5
commit
069a7e1f8a
@ -41,10 +41,10 @@ type freezerdb struct {
|
|||||||
// the slow ancient tables.
|
// the slow ancient tables.
|
||||||
func (frdb *freezerdb) Close() error {
|
func (frdb *freezerdb) Close() error {
|
||||||
var errs []error
|
var errs []error
|
||||||
if err := frdb.KeyValueStore.Close(); err != nil {
|
if err := frdb.AncientStore.Close(); err != nil {
|
||||||
errs = append(errs, err)
|
errs = append(errs, err)
|
||||||
}
|
}
|
||||||
if err := frdb.AncientStore.Close(); err != nil {
|
if err := frdb.KeyValueStore.Close(); err != nil {
|
||||||
errs = append(errs, err)
|
errs = append(errs, err)
|
||||||
}
|
}
|
||||||
if len(errs) != 0 {
|
if len(errs) != 0 {
|
||||||
|
@ -73,6 +73,7 @@ type freezer struct {
|
|||||||
|
|
||||||
tables map[string]*freezerTable // Data tables for storing everything
|
tables map[string]*freezerTable // Data tables for storing everything
|
||||||
instanceLock fileutil.Releaser // File-system lock to prevent double opens
|
instanceLock fileutil.Releaser // File-system lock to prevent double opens
|
||||||
|
quit chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// newFreezer creates a chain freezer that moves ancient chain data into
|
// newFreezer creates a chain freezer that moves ancient chain data into
|
||||||
@ -101,6 +102,7 @@ func newFreezer(datadir string, namespace string) (*freezer, error) {
|
|||||||
freezer := &freezer{
|
freezer := &freezer{
|
||||||
tables: make(map[string]*freezerTable),
|
tables: make(map[string]*freezerTable),
|
||||||
instanceLock: lock,
|
instanceLock: lock,
|
||||||
|
quit: make(chan struct{}),
|
||||||
}
|
}
|
||||||
for name, disableSnappy := range freezerNoSnappy {
|
for name, disableSnappy := range freezerNoSnappy {
|
||||||
table, err := newTable(datadir, name, readMeter, writeMeter, sizeGauge, disableSnappy)
|
table, err := newTable(datadir, name, readMeter, writeMeter, sizeGauge, disableSnappy)
|
||||||
@ -126,6 +128,7 @@ func newFreezer(datadir string, namespace string) (*freezer, error) {
|
|||||||
|
|
||||||
// Close terminates the chain freezer, unmapping all the data files.
|
// Close terminates the chain freezer, unmapping all the data files.
|
||||||
func (f *freezer) Close() error {
|
func (f *freezer) Close() error {
|
||||||
|
f.quit <- struct{}{}
|
||||||
var errs []error
|
var errs []error
|
||||||
for _, table := range f.tables {
|
for _, table := range f.tables {
|
||||||
if err := table.Close(); err != nil {
|
if err := table.Close(); err != nil {
|
||||||
@ -254,35 +257,50 @@ func (f *freezer) Sync() error {
|
|||||||
func (f *freezer) freeze(db ethdb.KeyValueStore) {
|
func (f *freezer) freeze(db ethdb.KeyValueStore) {
|
||||||
nfdb := &nofreezedb{KeyValueStore: db}
|
nfdb := &nofreezedb{KeyValueStore: db}
|
||||||
|
|
||||||
|
backoff := false
|
||||||
for {
|
for {
|
||||||
|
select {
|
||||||
|
case <-f.quit:
|
||||||
|
log.Info("Freezer shutting down")
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
if backoff {
|
||||||
|
select {
|
||||||
|
case <-time.NewTimer(freezerRecheckInterval).C:
|
||||||
|
backoff = false
|
||||||
|
case <-f.quit:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
// Retrieve the freezing threshold.
|
// Retrieve the freezing threshold.
|
||||||
hash := ReadHeadBlockHash(nfdb)
|
hash := ReadHeadBlockHash(nfdb)
|
||||||
if hash == (common.Hash{}) {
|
if hash == (common.Hash{}) {
|
||||||
log.Debug("Current full block hash unavailable") // new chain, empty database
|
log.Debug("Current full block hash unavailable") // new chain, empty database
|
||||||
time.Sleep(freezerRecheckInterval)
|
backoff = true
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
number := ReadHeaderNumber(nfdb, hash)
|
number := ReadHeaderNumber(nfdb, hash)
|
||||||
switch {
|
switch {
|
||||||
case number == nil:
|
case number == nil:
|
||||||
log.Error("Current full block number unavailable", "hash", hash)
|
log.Error("Current full block number unavailable", "hash", hash)
|
||||||
time.Sleep(freezerRecheckInterval)
|
backoff = true
|
||||||
continue
|
continue
|
||||||
|
|
||||||
case *number < params.ImmutabilityThreshold:
|
case *number < params.ImmutabilityThreshold:
|
||||||
log.Debug("Current full block not old enough", "number", *number, "hash", hash, "delay", params.ImmutabilityThreshold)
|
log.Debug("Current full block not old enough", "number", *number, "hash", hash, "delay", params.ImmutabilityThreshold)
|
||||||
time.Sleep(freezerRecheckInterval)
|
backoff = true
|
||||||
continue
|
continue
|
||||||
|
|
||||||
case *number-params.ImmutabilityThreshold <= f.frozen:
|
case *number-params.ImmutabilityThreshold <= f.frozen:
|
||||||
log.Debug("Ancient blocks frozen already", "number", *number, "hash", hash, "frozen", f.frozen)
|
log.Debug("Ancient blocks frozen already", "number", *number, "hash", hash, "frozen", f.frozen)
|
||||||
time.Sleep(freezerRecheckInterval)
|
backoff = true
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
head := ReadHeader(nfdb, hash, *number)
|
head := ReadHeader(nfdb, hash, *number)
|
||||||
if head == nil {
|
if head == nil {
|
||||||
log.Error("Current full block unavailable", "number", *number, "hash", hash)
|
log.Error("Current full block unavailable", "number", *number, "hash", hash)
|
||||||
time.Sleep(freezerRecheckInterval)
|
backoff = true
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// Seems we have data ready to be frozen, process in usable batches
|
// Seems we have data ready to be frozen, process in usable batches
|
||||||
@ -369,7 +387,7 @@ func (f *freezer) freeze(db ethdb.KeyValueStore) {
|
|||||||
|
|
||||||
// Avoid database thrashing with tiny writes
|
// Avoid database thrashing with tiny writes
|
||||||
if f.frozen-first < freezerBatchLimit {
|
if f.frozen-first < freezerBatchLimit {
|
||||||
time.Sleep(freezerRecheckInterval)
|
backoff = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user