Skip to content

Commit ef419d0

Browse files
authored
Merge pull request #960 from bucko909/early-creates
2 parents 52b40b2 + 37d144f commit ef419d0

File tree

3 files changed

+38
-22
lines changed

3 files changed

+38
-22
lines changed

lib/carbon/cache.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import threading
1717
from operator import itemgetter
1818
from random import choice
19-
from collections import defaultdict
19+
from collections import defaultdict, deque
2020

2121
from carbon.conf import settings
2222
from carbon import events, log
@@ -189,6 +189,7 @@ class _MetricCache(defaultdict):
189189
def __init__(self, strategy=None):
190190
self.lock = threading.Lock()
191191
self.size = 0
192+
self.new_metrics = deque()
192193
self.strategy = None
193194
if strategy:
194195
self.strategy = strategy(self)
@@ -253,6 +254,8 @@ def store(self, metric, datapoint):
253254
log.msg("MetricCache is full: self.size=%d" % self.size)
254255
events.cacheFull()
255256
else:
257+
if not self[metric]:
258+
self.new_metrics.append(metric)
256259
self.size += 1
257260
self[metric][timestamp] = value
258261
if self.strategy:

lib/carbon/util.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ def drain(self, cost, blocking=False):
287287
'''Given a number of tokens (or fractions) drain will return True and
288288
drain the number of tokens from the bucket if the capacity allows,
289289
otherwise we return false and leave the contents of the bucket.'''
290-
if cost <= self.tokens:
290+
if self.peek(cost):
291291
self._tokens -= cost
292292
return True
293293

@@ -310,16 +310,16 @@ def setCapacityAndFillRate(self, new_capacity, new_fill_rate):
310310
self.fill_rate = float(new_fill_rate)
311311
self._tokens = delta + self._tokens
312312

313-
@property
314-
def tokens(self):
315-
'''The tokens property will return the current number of tokens in the
316-
bucket.'''
317-
if self._tokens < self.capacity:
313+
def peek(self, cost):
314+
'''Return true if the bucket can drain cost without blocking.'''
315+
if self._tokens >= cost:
316+
return True
317+
else:
318318
now = time()
319319
delta = self.fill_rate * (now - self.timestamp)
320320
self._tokens = min(self.capacity, self._tokens + delta)
321321
self.timestamp = now
322-
return self._tokens
322+
return self._tokens >= cost
323323

324324

325325
class PluginRegistrar(type):

lib/carbon/writer.py

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -95,24 +95,25 @@ def writeCachedDataPoints():
9595

9696
cache = MetricCache()
9797
while cache:
98-
(metric, datapoints) = cache.drain_metric()
99-
if metric is None:
100-
# end the loop
101-
break
98+
# First, create new metrics files, which is helpful for graphite-web
99+
while cache.new_metrics and (not CREATE_BUCKET or CREATE_BUCKET.peek(1)):
100+
metric = cache.new_metrics.popleft()
102101

103-
dbFileExists = state.database.exists(metric)
102+
if metric not in cache:
103+
# This metric has already been drained. There's no sense in creating it.
104+
continue
104105

105-
if not dbFileExists:
106-
if CREATE_BUCKET and not CREATE_BUCKET.drain(1):
107-
# If our tokenbucket doesn't have enough tokens available to create a new metric
108-
# file then we'll just drop the metric on the ground and move on to the next
109-
# metric.
110-
# XXX This behavior should probably be configurable to no tdrop metrics
111-
# when rate limiting unless our cache is too big or some other legit
112-
# reason.
113-
instrumentation.increment('droppedCreates')
106+
if state.database.exists(metric):
114107
continue
115108

109+
if CREATE_BUCKET and not CREATE_BUCKET.drain(1):
110+
# This should never actually happen as no other thread should be
111+
# draining our tokens, and we just checked for a token.
112+
# Just put the new metric back in the create list and we'll try again
113+
# after writing an update.
114+
cache.new_metrics.appendleft(metric)
115+
break
116+
116117
archiveConfig = None
117118
xFilesFactor, aggregationMethod = None, None
118119

@@ -150,6 +151,18 @@ def writeCachedDataPoints():
150151
instrumentation.increment('errors')
151152
continue
152153

154+
# now drain and persist some data
155+
(metric, datapoints) = cache.drain_metric()
156+
if metric is None:
157+
# end the loop
158+
break
159+
160+
if not state.database.exists(metric):
161+
# If we get here, the metric must still be in new_metrics. We're
162+
# creating too fast, and we'll drop this data.
163+
instrumentation.increment('droppedCreates')
164+
continue
165+
153166
# If we've got a rate limit configured lets makes sure we enforce it
154167
waitTime = 0
155168
if UPDATE_BUCKET:

0 commit comments

Comments
 (0)