Gracefully exit geth command

This commit is contained in:
Elizabeth Engelman 2019-01-28 12:47:52 -06:00
parent b2ba24509b
commit b6dc2d509e
3 changed files with 81 additions and 25 deletions

View File

@ -49,34 +49,67 @@ func (StateDiffService) APIs() []rpc.API {
return []rpc.API{} return []rpc.API{}
} }
func (sds *StateDiffService) Loop(events chan core.ChainEvent) { func (sds *StateDiffService) Loop(chainEventCh chan core.ChainEvent) {
for elem := range events { chainEventSub := sds.BlockChain.SubscribeChainEvent(chainEventCh)
currentBlock := elem.Block defer chainEventSub.Unsubscribe()
parentHash := currentBlock.ParentHash()
parentBlock := sds.BlockChain.GetBlockByHash(parentHash)
stateDiffLocation, err := sds.Extractor.ExtractStateDiff(*parentBlock, *currentBlock) blocksCh := make(chan *types.Block)
if err != nil { errCh := chainEventSub.Err()
log.Error("Error extracting statediff", "block number", currentBlock.Number(), "error", err) quitCh := make(chan struct{})
} else {
log.Info("Statediff extracted", "block number", currentBlock.Number(), "location", stateDiffLocation) go func() {
HandleLoop:
for {
select {
//Notify chain event channel of events
case chainEvent := <-chainEventCh:
log.Debug("Event received from chainEventCh", "event", chainEvent)
blocksCh <- chainEvent.Block
//if node stopped
case err := <-errCh:
log.Debug("Error from chain event subscription, breaking loop.", "error", err)
break HandleLoop
}
}
close(quitCh)
}()
//loop through chain events until no more
for {
select {
case block := <-blocksCh:
currentBlock := block
parentHash := currentBlock.ParentHash()
parentBlock := sds.BlockChain.GetBlockByHash(parentHash)
if parentBlock == nil {
log.Warn("Parent block is nil, skipping this block",
"parent block hash", parentHash.String(),
"current block number", currentBlock.Number())
break
}
stateDiffLocation, err := sds.Extractor.ExtractStateDiff(*parentBlock, *currentBlock)
if err != nil {
log.Error("Error extracting statediff", "block number", currentBlock.Number(), "error", err)
} else {
log.Info("Statediff extracted", "block number", currentBlock.Number(), "location", stateDiffLocation)
}
case <-quitCh:
return
} }
} }
} }
var eventsChannel chan core.ChainEvent
func (sds *StateDiffService) Start(server *p2p.Server) error { func (sds *StateDiffService) Start(server *p2p.Server) error {
log.Info("Starting statediff service") log.Info("Starting statediff service")
eventsChannel := make(chan core.ChainEvent, 10)
sds.BlockChain.SubscribeChainEvent(eventsChannel) chainEventCh := make(chan core.ChainEvent, 10)
go sds.Loop(eventsChannel) go sds.Loop(chainEventCh)
return nil return nil
} }
func (StateDiffService) Stop() error { func (StateDiffService) Stop() error {
log.Info("Stopping statediff service") log.Info("Stopping statediff service")
close(eventsChannel)
return nil return nil
} }

View File

@ -9,7 +9,7 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
service2 "github.com/ethereum/go-ethereum/statediff/service" s "github.com/ethereum/go-ethereum/statediff/service"
"github.com/ethereum/go-ethereum/statediff/testhelpers/mocks" "github.com/ethereum/go-ethereum/statediff/testhelpers/mocks"
) )
@ -18,7 +18,7 @@ func TestServiceLoop(t *testing.T) {
} }
var ( var (
eventsChannel = make(chan core.ChainEvent, 10) eventsChannel = make(chan core.ChainEvent)
parentHeader1 = types.Header{Number: big.NewInt(rand.Int63())} parentHeader1 = types.Header{Number: big.NewInt(rand.Int63())}
parentHeader2 = types.Header{Number: big.NewInt(rand.Int63())} parentHeader2 = types.Header{Number: big.NewInt(rand.Int63())}
@ -40,20 +40,19 @@ var (
) )
func testServiceLoop(t *testing.T) { func testServiceLoop(t *testing.T) {
eventsChannel <- event1
eventsChannel <- event2
extractor := mocks.Extractor{} extractor := mocks.Extractor{}
close(eventsChannel) //close(eventsChannel)
blockChain := mocks.BlockChain{} blockChain := mocks.BlockChain{}
service := service2.StateDiffService{ service := s.StateDiffService{
Builder: nil, Builder: nil,
Extractor: &extractor, Extractor: &extractor,
BlockChain: &blockChain, BlockChain: &blockChain,
} }
blockChain.SetParentBlockToReturn([]*types.Block{parentBlock1, parentBlock2}) blockChain.SetParentBlockToReturn([]*types.Block{parentBlock1, parentBlock2})
blockChain.SetChainEvents([]core.ChainEvent{event1, event2})
service.Loop(eventsChannel) service.Loop(eventsChannel)
//parent and current blocks are passed to the extractor //parent and current blocks are passed to the extractor

View File

@ -5,12 +5,14 @@ import (
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"errors"
) )
type BlockChain struct { type BlockChain struct {
ParentHashesLookedUp []common.Hash ParentHashesLookedUp []common.Hash
parentBlocksToReturn []*types.Block parentBlocksToReturn []*types.Block
callCount int callCount int
ChainEvents []core.ChainEvent
} }
func (mc *BlockChain) SetParentBlockToReturn(blocks []*types.Block) { func (mc *BlockChain) SetParentBlockToReturn(blocks []*types.Block) {
@ -29,6 +31,28 @@ func (mc *BlockChain) GetBlockByHash(hash common.Hash) *types.Block {
return &parentBlock return &parentBlock
} }
func (BlockChain) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription { func (bc *BlockChain) SetChainEvents(chainEvents []core.ChainEvent) {
panic("implement me") bc.ChainEvents = chainEvents
}
func (bc *BlockChain) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
subErr := errors.New("Subscription Error")
var eventCounter int
subscription := event.NewSubscription(func(quit <-chan struct{}) error {
for _, chainEvent := range bc.ChainEvents {
if eventCounter > 1 {
return subErr
}
select {
case ch <- chainEvent:
case <-quit:
return nil
}
eventCounter++
}
return nil
})
return subscription
} }