implement halting as state machine

This commit is contained in:
Yusef Napora 2020-06-29 19:18:12 -04:00
parent 6c4e83f8a2
commit 8aef427fd8
5 changed files with 251 additions and 21 deletions

View File

@ -68,6 +68,5 @@
[groups.run.test_params] [groups.run.test_params]
role = "drand" role = "drand"
drand_period = "1s" drand_period = "1s"
drand_halt_duration = "1m"
drand_halt_begin = "10s"
drand_log_level = "none" drand_log_level = "none"
suspend_events = "wait 20s -> halt -> wait 45s -> resume -> wait 5s -> halt -> wait 1m -> resume"

View File

@ -74,7 +74,6 @@ func (t *TestEnvironment) DebugSpew(format string, args... interface{}) {
t.RecordMessage(spew.Sprintf(format, args...)) t.RecordMessage(spew.Sprintf(format, args...))
} }
type Node struct { type Node struct {
fullApi api.FullNode fullApi api.FullNode
minerApi api.StorageMiner minerApi api.StorageMiner

View File

@ -24,6 +24,8 @@ import (
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
"github.com/testground/sdk-go/sync" "github.com/testground/sdk-go/sync"
"github.com/filecoin-project/oni/lotus-soup/statemachine"
) )
var ( var (
@ -121,14 +123,14 @@ func (dr *DrandInstance) RunDKG(nodes, thr int, timeout string, leader bool, lea
return kg return kg
} }
func (dr *DrandInstance) Halt(duration time.Duration) { func (dr *DrandInstance) Halt() {
dr.t.RecordMessage("drand node %d halting for %s", dr.t.GroupSeq, duration.String()) dr.t.RecordMessage("drand node #%d halting", dr.t.GroupSeq)
dr.daemon.StopBeacon() dr.daemon.StopBeacon()
}
time.AfterFunc(duration, func() { func (dr *DrandInstance) Resume() {
dr.t.RecordMessage("drand node %d coming back online", dr.t.GroupSeq) dr.t.RecordMessage("drand node #d resuming", dr.t.GroupSeq)
dr.daemon.StartBeacon(true) dr.daemon.StartBeacon(true)
})
} }
func runDrandNode(t *TestEnvironment) error { func runDrandNode(t *TestEnvironment) error {
@ -142,15 +144,9 @@ func runDrandNode(t *TestEnvironment) error {
ctx := context.Background() ctx := context.Background()
t.SyncClient.MustSignalAndWait(ctx, stateReady, t.TestInstanceCount) t.SyncClient.MustSignalAndWait(ctx, stateReady, t.TestInstanceCount)
haltDuration := time.Duration(0) if t.IsParamSet("suspend_events") {
if t.IsParamSet("drand_halt_duration") { suspender := statemachine.NewSuspender(dr, t.RecordMessage)
haltDuration = t.DurationParam("drand_halt_duration") suspender.RunEvents(t.StringParam("suspend_events"))
}
if haltDuration != 0 {
startTime := t.DurationParam("drand_halt_begin")
time.AfterFunc(startTime, func() {
dr.Halt(haltDuration)
})
} }
t.SyncClient.MustSignalAndWait(ctx, stateDone, t.TestInstanceCount) t.SyncClient.MustSignalAndWait(ctx, stateDone, t.TestInstanceCount)

View File

@ -0,0 +1,108 @@
package statemachine
import (
"errors"
"sync"
)
// This code has been shamelessly lifted from this blog post:
// https://venilnoronha.io/a-simple-state-machine-framework-in-go
// Many thanks to the author, Venil Norohnha
// ErrEventRejected is the error returned when the state machine cannot process
// an event in the state that it is in.
var ErrEventRejected = errors.New("event rejected")
const (
// Default represents the default state of the system.
Default StateType = ""
// NoOp represents a no-op event.
NoOp EventType = "NoOp"
)
// StateType represents an extensible state type in the state machine.
type StateType string
// EventType represents an extensible event type in the state machine.
type EventType string
// EventContext represents the context to be passed to the action implementation.
type EventContext interface{}
// Action represents the action to be executed in a given state.
type Action interface {
Execute(eventCtx EventContext) EventType
}
// Events represents a mapping of events and states.
type Events map[EventType]StateType
// State binds a state with an action and a set of events it can handle.
type State struct {
Action Action
Events Events
}
// States represents a mapping of states and their implementations.
type States map[StateType]State
// StateMachine represents the state machine.
type StateMachine struct {
// Previous represents the previous state.
Previous StateType
// Current represents the current state.
Current StateType
// States holds the configuration of states and events handled by the state machine.
States States
// mutex ensures that only 1 event is processed by the state machine at any given time.
mutex sync.Mutex
}
// getNextState returns the next state for the event given the machine's current
// state, or an error if the event can't be handled in the given state.
func (s *StateMachine) getNextState(event EventType) (StateType, error) {
if state, ok := s.States[s.Current]; ok {
if state.Events != nil {
if next, ok := state.Events[event]; ok {
return next, nil
}
}
}
return Default, ErrEventRejected
}
// SendEvent sends an event to the state machine.
func (s *StateMachine) SendEvent(event EventType, eventCtx EventContext) error {
s.mutex.Lock()
defer s.mutex.Unlock()
for {
// Determine the next state for the event given the machine's current state.
nextState, err := s.getNextState(event)
if err != nil {
return ErrEventRejected
}
// Identify the state definition for the next state.
state, ok := s.States[nextState]
if !ok || state.Action == nil {
// configuration error
}
// Transition over to the next state.
s.Previous = s.Current
s.Current = nextState
// Execute the next state's action and loop over again if the event returned
// is not a no-op.
nextEvent := state.Action.Execute(eventCtx)
if nextEvent == NoOp {
return nil
}
event = nextEvent
}
}

View File

@ -0,0 +1,128 @@
package statemachine
import (
"fmt"
"strings"
"time"
)
const (
Running StateType = "running"
Suspended StateType = "suspended"
Halt EventType = "halt"
Resume EventType = "resume"
)
type Suspendable interface {
Halt()
Resume()
}
type HaltAction struct{}
func (a *HaltAction) Execute(ctx EventContext) EventType {
s, ok := ctx.(*Suspender)
if !ok {
fmt.Println("unable to halt, event context is not Suspendable")
return NoOp
}
s.target.Halt()
return NoOp
}
type ResumeAction struct{}
func (a *ResumeAction) Execute(ctx EventContext) EventType {
s, ok := ctx.(*Suspender)
if !ok {
fmt.Println("unable to resume, event context is not Suspendable")
return NoOp
}
s.target.Resume()
return NoOp
}
type Suspender struct {
StateMachine
target Suspendable
log LogFn
}
type LogFn func(fmt string, args ...interface{})
func NewSuspender(target Suspendable, log LogFn) *Suspender {
return &Suspender{
target: target,
log: log,
StateMachine: StateMachine{
Current: Running,
States: States{
Running: State{
Action: &ResumeAction{},
Events: Events{
Halt: Suspended,
},
},
Suspended: State{
Action: &HaltAction{},
Events: Events{
Resume: Running,
},
},
},
},
}
}
func (s *Suspender) RunEvents(eventSpec string) {
s.log("running event spec: %s", eventSpec)
for _, et := range parseEventSpec(eventSpec, s.log) {
if et.delay != 0 {
//s.log("waiting %s", et.delay.String())
time.Sleep(et.delay)
continue
}
if et.event == "" {
s.log("ignoring empty event")
continue
}
s.log("sending event %s", et.event)
err := s.SendEvent(et.event, s)
if err != nil {
s.log("error sending event %s: %s", et.event, err)
}
}
}
type eventTiming struct {
delay time.Duration
event EventType
}
func parseEventSpec(spec string, log LogFn) []eventTiming {
fields := strings.Split(spec, "->")
out := make([]eventTiming, 0, len(fields))
for _, f := range fields {
f = strings.TrimSpace(f)
words := strings.Split(f, " ")
// TODO: try to implement a "waiting" state instead of special casing like this
if words[0] == "wait" {
if len(words) != 2 {
log("expected 'wait' to be followed by duration, e.g. 'wait 30s'. ignoring.")
continue
}
d, err := time.ParseDuration(words[1])
if err != nil {
log("bad argument for 'wait': %s", err)
continue
}
out = append(out, eventTiming{delay: d})
} else {
out = append(out, eventTiming{event: EventType(words[0])})
}
}
return out
}