Merge pull request #4301 from filecoin-project/steb/fix-two-races

Fix two races
This commit is contained in:
Łukasz Magiera 2020-10-10 20:37:13 +02:00 committed by GitHub
commit a491fc97b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 34 additions and 33 deletions

View File

@ -1726,7 +1726,7 @@ func VerifyElectionPoStVRF(ctx context.Context, worker address.Address, rand []b
return gen.VerifyVRF(ctx, worker, rand, evrf) return gen.VerifyVRF(ctx, worker, rand, evrf)
} }
func (syncer *Syncer) State() []SyncerState { func (syncer *Syncer) State() []SyncerStateSnapshot {
return syncer.syncmgr.State() return syncer.syncmgr.State()
} }

View File

@ -38,7 +38,7 @@ type SyncManager interface {
SetPeerHead(ctx context.Context, p peer.ID, ts *types.TipSet) SetPeerHead(ctx context.Context, p peer.ID, ts *types.TipSet)
// State retrieves the state of the sync workers. // State retrieves the state of the sync workers.
State() []SyncerState State() []SyncerStateSnapshot
} }
type syncManager struct { type syncManager struct {
@ -79,7 +79,7 @@ type syncResult struct {
const syncWorkerCount = 3 const syncWorkerCount = 3
func NewSyncManager(sync SyncFunc) SyncManager { func NewSyncManager(sync SyncFunc) SyncManager {
return &syncManager{ sm := &syncManager{
bspThresh: 1, bspThresh: 1,
peerHeads: make(map[peer.ID]*types.TipSet), peerHeads: make(map[peer.ID]*types.TipSet),
syncTargets: make(chan *types.TipSet), syncTargets: make(chan *types.TipSet),
@ -90,6 +90,10 @@ func NewSyncManager(sync SyncFunc) SyncManager {
doSync: sync, doSync: sync,
stop: make(chan struct{}), stop: make(chan struct{}),
} }
for i := range sm.syncStates {
sm.syncStates[i] = new(SyncerState)
}
return sm
} }
func (sm *syncManager) Start() { func (sm *syncManager) Start() {
@ -128,8 +132,8 @@ func (sm *syncManager) SetPeerHead(ctx context.Context, p peer.ID, ts *types.Tip
sm.incomingTipSets <- ts sm.incomingTipSets <- ts
} }
func (sm *syncManager) State() []SyncerState { func (sm *syncManager) State() []SyncerStateSnapshot {
ret := make([]SyncerState, 0, len(sm.syncStates)) ret := make([]SyncerStateSnapshot, 0, len(sm.syncStates))
for _, s := range sm.syncStates { for _, s := range sm.syncStates {
ret = append(ret, s.Snapshot()) ret = append(ret, s.Snapshot())
} }
@ -405,8 +409,7 @@ func (sm *syncManager) scheduleWorkSent() {
} }
func (sm *syncManager) syncWorker(id int) { func (sm *syncManager) syncWorker(id int) {
ss := &SyncerState{} ss := sm.syncStates[id]
sm.syncStates[id] = ss
for { for {
select { select {
case ts, ok := <-sm.syncTargets: case ts, ok := <-sm.syncTargets:

View File

@ -11,8 +11,7 @@ import (
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
) )
type SyncerState struct { type SyncerStateSnapshot struct {
lk sync.Mutex
Target *types.TipSet Target *types.TipSet
Base *types.TipSet Base *types.TipSet
Stage api.SyncStateStage Stage api.SyncStateStage
@ -22,6 +21,11 @@ type SyncerState struct {
End time.Time End time.Time
} }
type SyncerState struct {
lk sync.Mutex
data SyncerStateSnapshot
}
func (ss *SyncerState) SetStage(v api.SyncStateStage) { func (ss *SyncerState) SetStage(v api.SyncStateStage) {
if ss == nil { if ss == nil {
return return
@ -29,9 +33,9 @@ func (ss *SyncerState) SetStage(v api.SyncStateStage) {
ss.lk.Lock() ss.lk.Lock()
defer ss.lk.Unlock() defer ss.lk.Unlock()
ss.Stage = v ss.data.Stage = v
if v == api.StageSyncComplete { if v == api.StageSyncComplete {
ss.End = build.Clock.Now() ss.data.End = build.Clock.Now()
} }
} }
@ -42,13 +46,13 @@ func (ss *SyncerState) Init(base, target *types.TipSet) {
ss.lk.Lock() ss.lk.Lock()
defer ss.lk.Unlock() defer ss.lk.Unlock()
ss.Target = target ss.data.Target = target
ss.Base = base ss.data.Base = base
ss.Stage = api.StageHeaders ss.data.Stage = api.StageHeaders
ss.Height = 0 ss.data.Height = 0
ss.Message = "" ss.data.Message = ""
ss.Start = build.Clock.Now() ss.data.Start = build.Clock.Now()
ss.End = time.Time{} ss.data.End = time.Time{}
} }
func (ss *SyncerState) SetHeight(h abi.ChainEpoch) { func (ss *SyncerState) SetHeight(h abi.ChainEpoch) {
@ -58,7 +62,7 @@ func (ss *SyncerState) SetHeight(h abi.ChainEpoch) {
ss.lk.Lock() ss.lk.Lock()
defer ss.lk.Unlock() defer ss.lk.Unlock()
ss.Height = h ss.data.Height = h
} }
func (ss *SyncerState) Error(err error) { func (ss *SyncerState) Error(err error) {
@ -68,21 +72,13 @@ func (ss *SyncerState) Error(err error) {
ss.lk.Lock() ss.lk.Lock()
defer ss.lk.Unlock() defer ss.lk.Unlock()
ss.Message = err.Error() ss.data.Message = err.Error()
ss.Stage = api.StageSyncErrored ss.data.Stage = api.StageSyncErrored
ss.End = build.Clock.Now() ss.data.End = build.Clock.Now()
} }
func (ss *SyncerState) Snapshot() SyncerState { func (ss *SyncerState) Snapshot() SyncerStateSnapshot {
ss.lk.Lock() ss.lk.Lock()
defer ss.lk.Unlock() defer ss.lk.Unlock()
return SyncerState{ return ss.data
Base: ss.Base,
Target: ss.Target,
Stage: ss.Stage,
Height: ss.Height,
Message: ss.Message,
Start: ss.Start,
End: ss.End,
}
} }

View File

@ -42,11 +42,13 @@ func RunHello(mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host, svc *hello.
return xerrors.Errorf("failed to subscribe to event bus: %w", err) return xerrors.Errorf("failed to subscribe to event bus: %w", err)
} }
ctx := helpers.LifecycleCtx(mctx, lc)
go func() { go func() {
for evt := range sub.Out() { for evt := range sub.Out() {
pic := evt.(event.EvtPeerIdentificationCompleted) pic := evt.(event.EvtPeerIdentificationCompleted)
go func() { go func() {
if err := svc.SayHello(helpers.LifecycleCtx(mctx, lc), pic.Peer); err != nil { if err := svc.SayHello(ctx, pic.Peer); err != nil {
protos, _ := h.Peerstore().GetProtocols(pic.Peer) protos, _ := h.Peerstore().GetProtocols(pic.Peer)
agent, _ := h.Peerstore().Get(pic.Peer, "AgentVersion") agent, _ := h.Peerstore().Get(pic.Peer, "AgentVersion")
if protosContains(protos, hello.ProtocolID) { if protosContains(protos, hello.ProtocolID) {