From 7245ac2b699263d91321c4286f9b1a51eac4a91c Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Sat, 10 Oct 2020 08:31:04 -0700 Subject: [PATCH 1/2] fix a race in the sync manager 1. SyncerState contains a mutex and should never be copied. Honestly, this case was probably fine but it's just as easy to create a separate snapshot type and easier to reason about. 2. We need to initialize the syncStates array once at start, before accessing it. By each syncer state inside each worker, we were racing with calls to `State()`. Again, this was probably benign, but I don't trust optimizing compilers. --- chain/sync.go | 2 +- chain/sync_manager.go | 15 ++++++++------ chain/syncstate.go | 46 ++++++++++++++++++++----------------------- 3 files changed, 31 insertions(+), 32 deletions(-) diff --git a/chain/sync.go b/chain/sync.go index 369c65d33..f71100621 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -1726,7 +1726,7 @@ func VerifyElectionPoStVRF(ctx context.Context, worker address.Address, rand []b return gen.VerifyVRF(ctx, worker, rand, evrf) } -func (syncer *Syncer) State() []SyncerState { +func (syncer *Syncer) State() []SyncerStateSnapshot { return syncer.syncmgr.State() } diff --git a/chain/sync_manager.go b/chain/sync_manager.go index 811092bc7..c7fdea726 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -38,7 +38,7 @@ type SyncManager interface { SetPeerHead(ctx context.Context, p peer.ID, ts *types.TipSet) // State retrieves the state of the sync workers. - State() []SyncerState + State() []SyncerStateSnapshot } type syncManager struct { @@ -79,7 +79,7 @@ type syncResult struct { const syncWorkerCount = 3 func NewSyncManager(sync SyncFunc) SyncManager { - return &syncManager{ + sm := &syncManager{ bspThresh: 1, peerHeads: make(map[peer.ID]*types.TipSet), syncTargets: make(chan *types.TipSet), @@ -90,6 +90,10 @@ func NewSyncManager(sync SyncFunc) SyncManager { doSync: sync, stop: make(chan struct{}), } + for i := range sm.syncStates { + sm.syncStates[i] = new(SyncerState) + } + return sm } func (sm *syncManager) Start() { @@ -128,8 +132,8 @@ func (sm *syncManager) SetPeerHead(ctx context.Context, p peer.ID, ts *types.Tip sm.incomingTipSets <- ts } -func (sm *syncManager) State() []SyncerState { - ret := make([]SyncerState, 0, len(sm.syncStates)) +func (sm *syncManager) State() []SyncerStateSnapshot { + ret := make([]SyncerStateSnapshot, 0, len(sm.syncStates)) for _, s := range sm.syncStates { ret = append(ret, s.Snapshot()) } @@ -405,8 +409,7 @@ func (sm *syncManager) scheduleWorkSent() { } func (sm *syncManager) syncWorker(id int) { - ss := &SyncerState{} - sm.syncStates[id] = ss + ss := sm.syncStates[id] for { select { case ts, ok := <-sm.syncTargets: diff --git a/chain/syncstate.go b/chain/syncstate.go index 06cd5d91e..26f9f1c39 100644 --- a/chain/syncstate.go +++ b/chain/syncstate.go @@ -11,8 +11,7 @@ import ( "github.com/filecoin-project/lotus/chain/types" ) -type SyncerState struct { - lk sync.Mutex +type SyncerStateSnapshot struct { Target *types.TipSet Base *types.TipSet Stage api.SyncStateStage @@ -22,6 +21,11 @@ type SyncerState struct { End time.Time } +type SyncerState struct { + lk sync.Mutex + data SyncerStateSnapshot +} + func (ss *SyncerState) SetStage(v api.SyncStateStage) { if ss == nil { return @@ -29,9 +33,9 @@ func (ss *SyncerState) SetStage(v api.SyncStateStage) { ss.lk.Lock() defer ss.lk.Unlock() - ss.Stage = v + ss.data.Stage = v if v == api.StageSyncComplete { - ss.End = build.Clock.Now() + ss.data.End = build.Clock.Now() } } @@ -42,13 +46,13 @@ func (ss *SyncerState) Init(base, target *types.TipSet) { ss.lk.Lock() defer ss.lk.Unlock() - ss.Target = target - ss.Base = base - ss.Stage = api.StageHeaders - ss.Height = 0 - ss.Message = "" - ss.Start = build.Clock.Now() - ss.End = time.Time{} + ss.data.Target = target + ss.data.Base = base + ss.data.Stage = api.StageHeaders + ss.data.Height = 0 + ss.data.Message = "" + ss.data.Start = build.Clock.Now() + ss.data.End = time.Time{} } func (ss *SyncerState) SetHeight(h abi.ChainEpoch) { @@ -58,7 +62,7 @@ func (ss *SyncerState) SetHeight(h abi.ChainEpoch) { ss.lk.Lock() defer ss.lk.Unlock() - ss.Height = h + ss.data.Height = h } func (ss *SyncerState) Error(err error) { @@ -68,21 +72,13 @@ func (ss *SyncerState) Error(err error) { ss.lk.Lock() defer ss.lk.Unlock() - ss.Message = err.Error() - ss.Stage = api.StageSyncErrored - ss.End = build.Clock.Now() + ss.data.Message = err.Error() + ss.data.Stage = api.StageSyncErrored + ss.data.End = build.Clock.Now() } -func (ss *SyncerState) Snapshot() SyncerState { +func (ss *SyncerState) Snapshot() SyncerStateSnapshot { ss.lk.Lock() defer ss.lk.Unlock() - return SyncerState{ - Base: ss.Base, - Target: ss.Target, - Stage: ss.Stage, - Height: ss.Height, - Message: ss.Message, - Start: ss.Start, - End: ss.End, - } + return ss.data } From c463582528150150ae5571901e1005f5475d89ff Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Sat, 10 Oct 2020 08:33:06 -0700 Subject: [PATCH 2/2] fix a race and optimize hello messages LifecycleCtx can _only_ be called during startup as it appends an fx hook. Worse, this was causing us to append an fx hook on every single hello message, leaking memory (and probably causing other shutdown issues...). --- node/modules/services.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/node/modules/services.go b/node/modules/services.go index 4ee0abacc..e0a7c2eda 100644 --- a/node/modules/services.go +++ b/node/modules/services.go @@ -42,11 +42,13 @@ func RunHello(mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host, svc *hello. return xerrors.Errorf("failed to subscribe to event bus: %w", err) } + ctx := helpers.LifecycleCtx(mctx, lc) + go func() { for evt := range sub.Out() { pic := evt.(event.EvtPeerIdentificationCompleted) go func() { - if err := svc.SayHello(helpers.LifecycleCtx(mctx, lc), pic.Peer); err != nil { + if err := svc.SayHello(ctx, pic.Peer); err != nil { protos, _ := h.Peerstore().GetProtocols(pic.Peer) agent, _ := h.Peerstore().Get(pic.Peer, "AgentVersion") if protosContains(protos, hello.ProtocolID) {