Skip to content

Commit f02ecae

Browse files
committed
Move hardware emit to separate component.
1 parent ae76f5f commit f02ecae

File tree

2 files changed

+157
-74
lines changed

2 files changed

+157
-74
lines changed

internal/internal_worker_base.go

+33-74
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,10 @@ import (
2828
"errors"
2929
"fmt"
3030
"os"
31-
"runtime"
3231
"sync"
3332
"syscall"
3433
"time"
3534

36-
"github.com/shirou/gopsutil/cpu"
3735
"github.com/uber-go/tally"
3836
"go.uber.org/zap"
3937
"go.uber.org/zap/zapcore"
@@ -140,10 +138,11 @@ type (
140138
logger *zap.Logger
141139
metricsScope tally.Scope
142140

143-
pollerRequestCh chan struct{}
144-
pollerAutoScaler *pollerAutoScaler
145-
taskQueueCh chan interface{}
146-
sessionTokenBucket *sessionTokenBucket
141+
pollerRequestCh chan struct{}
142+
pollerAutoScaler *pollerAutoScaler
143+
workerUsageCollector *workerUsageCollector
144+
taskQueueCh chan interface{}
145+
sessionTokenBucket *sessionTokenBucket
147146
}
148147

149148
polledTask struct {
@@ -173,17 +172,29 @@ func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope t
173172
logger,
174173
)
175174
}
175+
// for now it's default to be enabled
176+
var workerUC *workerUsageCollector
177+
workerUC = newWorkerUsageCollector(
178+
workerUsageCollectorOptions{
179+
Enabled: true,
180+
Cooldown: 30 * time.Second,
181+
Host: options.host,
182+
MetricsScope: metricsScope,
183+
},
184+
logger,
185+
)
176186

177187
bw := &baseWorker{
178-
options: options,
179-
shutdownCh: make(chan struct{}),
180-
taskLimiter: rate.NewLimiter(rate.Limit(options.maxTaskPerSecond), 1),
181-
retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy),
182-
logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}),
183-
metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType),
184-
pollerRequestCh: make(chan struct{}, options.maxConcurrentTask),
185-
pollerAutoScaler: pollerAS,
186-
taskQueueCh: make(chan interface{}), // no buffer, so poller only able to poll new task after previous is dispatched.
188+
options: options,
189+
shutdownCh: make(chan struct{}),
190+
taskLimiter: rate.NewLimiter(rate.Limit(options.maxTaskPerSecond), 1),
191+
retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy),
192+
logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}),
193+
metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType),
194+
pollerRequestCh: make(chan struct{}, options.maxConcurrentTask),
195+
pollerAutoScaler: pollerAS,
196+
workerUsageCollector: workerUC,
197+
taskQueueCh: make(chan interface{}), // no buffer, so poller only able to poll new task after previous is dispatched.
187198

188199
limiterContext: ctx,
189200
limiterContextCancel: cancel,
@@ -207,6 +218,10 @@ func (bw *baseWorker) Start() {
207218
bw.pollerAutoScaler.Start()
208219
}
209220

221+
if bw.workerUsageCollector != nil {
222+
bw.workerUsageCollector.Start()
223+
}
224+
210225
for i := 0; i < bw.options.pollerCount; i++ {
211226
bw.shutdownWG.Add(1)
212227
go bw.runPoller()
@@ -215,15 +230,6 @@ func (bw *baseWorker) Start() {
215230
bw.shutdownWG.Add(1)
216231
go bw.runTaskDispatcher()
217232

218-
// We want the emit function run once per host instead of run once per worker
219-
//collectHardwareUsageOnce.Do(func() {
220-
// bw.shutdownWG.Add(1)
221-
// go bw.emitHardwareUsage()
222-
//})
223-
224-
bw.shutdownWG.Add(1)
225-
go bw.emitHardwareUsage()
226-
227233
bw.isWorkerStarted = true
228234
traceLog(func() {
229235
bw.logger.Info("Started Worker",
@@ -407,6 +413,9 @@ func (bw *baseWorker) Stop() {
407413
if bw.pollerAutoScaler != nil {
408414
bw.pollerAutoScaler.Stop()
409415
}
416+
if bw.workerUsageCollector != nil {
417+
bw.workerUsageCollector.Stop()
418+
}
410419

411420
if success := util.AwaitWaitGroup(&bw.shutdownWG, bw.options.shutdownTimeout); !success {
412421
traceLog(func() {
@@ -420,53 +429,3 @@ func (bw *baseWorker) Stop() {
420429
}
421430
return
422431
}
423-
424-
func (bw *baseWorker) emitHardwareUsage() {
425-
defer func() {
426-
if p := recover(); p != nil {
427-
bw.metricsScope.Counter(metrics.WorkerPanicCounter).Inc(1)
428-
topLine := fmt.Sprintf("base worker for %s [panic]:", bw.options.workerType)
429-
st := getStackTraceRaw(topLine, 7, 0)
430-
bw.logger.Error("Unhandled panic in hardware emitting.",
431-
zap.String(tagPanicError, fmt.Sprintf("%v", p)),
432-
zap.String(tagPanicStack, st))
433-
}
434-
}()
435-
defer bw.shutdownWG.Done()
436-
collectHardwareUsageOnce.Do(
437-
func() {
438-
ticker := time.NewTicker(hardwareMetricsCollectInterval)
439-
for {
440-
select {
441-
case <-bw.shutdownCh:
442-
ticker.Stop()
443-
return
444-
case <-ticker.C:
445-
host := bw.options.host
446-
scope := bw.metricsScope.Tagged(map[string]string{clientHostTag: host})
447-
448-
cpuPercent, err := cpu.Percent(0, false)
449-
if err != nil {
450-
bw.logger.Warn("Failed to get cpu percent", zap.Error(err))
451-
return
452-
}
453-
cpuCores, err := cpu.Counts(false)
454-
if err != nil {
455-
bw.logger.Warn("Failed to get number of cpu cores", zap.Error(err))
456-
return
457-
}
458-
scope.Gauge(metrics.NumCPUCores).Update(float64(cpuCores))
459-
scope.Gauge(metrics.CPUPercentage).Update(cpuPercent[0])
460-
461-
var memStats runtime.MemStats
462-
runtime.ReadMemStats(&memStats)
463-
464-
scope.Gauge(metrics.NumGoRoutines).Update(float64(runtime.NumGoroutine()))
465-
scope.Gauge(metrics.TotalMemory).Update(float64(memStats.Sys))
466-
scope.Gauge(metrics.MemoryUsedHeap).Update(float64(memStats.HeapInuse))
467-
scope.Gauge(metrics.MemoryUsedStack).Update(float64(memStats.StackInuse))
468-
}
469-
}
470-
})
471-
472-
}
+124
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package internal
2+
3+
import (
4+
"context"
5+
"github.com/shirou/gopsutil/cpu"
6+
"github.com/uber-go/tally"
7+
"go.uber.org/cadence/internal/common/metrics"
8+
"go.uber.org/zap"
9+
"runtime"
10+
"sync"
11+
"time"
12+
)
13+
14+
type (
15+
workerUsageCollector struct {
16+
cooldownTime time.Duration
17+
logger *zap.Logger
18+
ctx context.Context
19+
wg *sync.WaitGroup // graceful stop
20+
cancel context.CancelFunc
21+
metricsScope tally.Scope
22+
host string
23+
}
24+
25+
workerUsageCollectorOptions struct {
26+
Enabled bool
27+
Cooldown time.Duration
28+
Host string
29+
MetricsScope tally.Scope
30+
}
31+
32+
hardwareUsage struct {
33+
NumCPUCores int
34+
CPUPercent float64
35+
NumGoRoutines int
36+
TotalMemory float64
37+
MemoryUsedHeap float64
38+
MemoryUsedStack float64
39+
}
40+
)
41+
42+
func newWorkerUsageCollector(
43+
options workerUsageCollectorOptions,
44+
logger *zap.Logger,
45+
) *workerUsageCollector {
46+
if !options.Enabled {
47+
return nil
48+
}
49+
ctx, cancel := context.WithCancel(context.Background())
50+
return &workerUsageCollector{
51+
cooldownTime: options.Cooldown,
52+
host: options.Host,
53+
metricsScope: options.MetricsScope,
54+
logger: logger,
55+
ctx: ctx,
56+
cancel: cancel,
57+
wg: &sync.WaitGroup{},
58+
}
59+
}
60+
61+
func (w *workerUsageCollector) Start() {
62+
w.wg.Add(1)
63+
go func() {
64+
defer func() {
65+
if p := recover(); p != nil {
66+
w.logger.Error("Unhandled panic in workerUsageCollector.")
67+
}
68+
}()
69+
defer w.wg.Done()
70+
collectHardwareUsageOnce.Do(
71+
func() {
72+
ticker := time.NewTicker(w.cooldownTime)
73+
for {
74+
select {
75+
case <-w.ctx.Done():
76+
return
77+
case <-ticker.C:
78+
hardwareUsageData := w.collectHardwareUsage()
79+
w.emitHardwareUsage(hardwareUsageData)
80+
81+
}
82+
}
83+
})
84+
}()
85+
return
86+
}
87+
88+
func (w *workerUsageCollector) Stop() {
89+
w.cancel()
90+
w.wg.Wait()
91+
}
92+
93+
func (w *workerUsageCollector) collectHardwareUsage() hardwareUsage {
94+
cpuPercent, err := cpu.Percent(0, false)
95+
if err != nil {
96+
w.logger.Warn("Failed to get cpu percent", zap.Error(err))
97+
}
98+
cpuCores, err := cpu.Counts(false)
99+
if err != nil {
100+
w.logger.Warn("Failed to get number of cpu cores", zap.Error(err))
101+
}
102+
103+
var memStats runtime.MemStats
104+
runtime.ReadMemStats(&memStats)
105+
return hardwareUsage{
106+
NumCPUCores: cpuCores,
107+
CPUPercent: cpuPercent[0],
108+
NumGoRoutines: runtime.NumGoroutine(),
109+
TotalMemory: float64(memStats.Sys),
110+
MemoryUsedHeap: float64(memStats.HeapAlloc),
111+
MemoryUsedStack: float64(memStats.StackInuse),
112+
}
113+
}
114+
115+
// emitHardwareUsage emits collected hardware usage metrics to metrics scope
116+
func (w *workerUsageCollector) emitHardwareUsage(usage hardwareUsage) {
117+
scope := w.metricsScope.Tagged(map[string]string{clientHostTag: w.host})
118+
scope.Gauge(metrics.NumCPUCores).Update(float64(usage.NumCPUCores))
119+
scope.Gauge(metrics.CPUPercentage).Update(usage.CPUPercent)
120+
scope.Gauge(metrics.NumGoRoutines).Update(float64(usage.NumGoRoutines))
121+
scope.Gauge(metrics.TotalMemory).Update(float64(usage.TotalMemory))
122+
scope.Gauge(metrics.MemoryUsedHeap).Update(float64(usage.MemoryUsedHeap))
123+
scope.Gauge(metrics.MemoryUsedStack).Update(float64(usage.MemoryUsedStack))
124+
}

0 commit comments

Comments
 (0)