Skip to content

Commit 01d4449

Browse files
authored
fix: delete date stats (#1306)
1 parent a90ed13 commit 01d4449

File tree

4 files changed

+37
-32
lines changed

4 files changed

+37
-32
lines changed

src/handlers/http/logstream.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,9 @@ pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamE
5757
objectstore.delete_stream(&stream_name).await?;
5858
// Delete from staging
5959
let stream_dir = PARSEABLE.get_or_create_stream(&stream_name);
60-
if fs::remove_dir_all(&stream_dir.data_path).is_err() {
60+
if let Err(err) = fs::remove_dir_all(&stream_dir.data_path) {
6161
warn!(
62-
"failed to delete local data for stream {}. Clean {} manually",
62+
"failed to delete local data for stream {} with error {err}. Clean {} manually",
6363
stream_name,
6464
stream_dir.data_path.to_string_lossy()
6565
)

src/handlers/http/modal/ingest/ingestor_logstream.rs

+2-13
Original file line numberDiff line numberDiff line change
@@ -59,23 +59,12 @@ pub async fn retention_cleanup(
5959

6060
pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
6161
let stream_name = stream_name.into_inner();
62-
// if the stream not found in memory map,
63-
//check if it exists in the storage
64-
//create stream and schema from storage
65-
if !PARSEABLE.streams.contains(&stream_name)
66-
&& !PARSEABLE
67-
.create_stream_and_schema_from_storage(&stream_name)
68-
.await
69-
.unwrap_or(false)
70-
{
71-
return Err(StreamNotFound(stream_name.clone()).into());
72-
}
7362

7463
// Delete from staging
7564
let stream_dir = PARSEABLE.get_stream(&stream_name)?;
76-
if fs::remove_dir_all(&stream_dir.data_path).is_err() {
65+
if let Err(err) = fs::remove_dir_all(&stream_dir.data_path) {
7766
warn!(
78-
"failed to delete local data for stream {}. Clean {} manually",
67+
"failed to delete local data for stream {} with error {err}. Clean {} manually",
7968
stream_name,
8069
stream_dir.data_path.to_string_lossy()
8170
)

src/handlers/http/modal/query/querier_logstream.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,9 @@ pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamE
6868
// Delete from storage
6969
objectstore.delete_stream(&stream_name).await?;
7070
let stream_dir = PARSEABLE.get_or_create_stream(&stream_name);
71-
if fs::remove_dir_all(&stream_dir.data_path).is_err() {
71+
if let Err(err) = fs::remove_dir_all(&stream_dir.data_path) {
7272
warn!(
73-
"failed to delete local data for stream {}. Clean {} manually",
73+
"failed to delete local data for stream {} with error {err}. Clean {} manually",
7474
stream_name,
7575
stream_dir.data_path.to_string_lossy()
7676
)

src/stats.rs

+31-15
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@
1616
*
1717
*/
1818

19+
use std::collections::HashMap;
1920
use std::sync::Arc;
2021

22+
use once_cell::sync::Lazy;
2123
use prometheus::core::Collector;
2224
use prometheus::proto::MetricFamily;
2325
use prometheus::IntGaugeVec;
@@ -171,15 +173,15 @@ pub fn delete_stats(stream_name: &str, format: &'static str) -> prometheus::Resu
171173
let event_labels = event_labels(stream_name, format);
172174
let storage_size_labels = storage_size_labels(stream_name);
173175

174-
EVENTS_INGESTED.remove_label_values(&event_labels)?;
175-
EVENTS_INGESTED_SIZE.remove_label_values(&event_labels)?;
176-
STORAGE_SIZE.remove_label_values(&storage_size_labels)?;
177-
EVENTS_DELETED.remove_label_values(&event_labels)?;
178-
EVENTS_DELETED_SIZE.remove_label_values(&event_labels)?;
179-
DELETED_EVENTS_STORAGE_SIZE.remove_label_values(&storage_size_labels)?;
180-
LIFETIME_EVENTS_INGESTED.remove_label_values(&event_labels)?;
181-
LIFETIME_EVENTS_INGESTED_SIZE.remove_label_values(&event_labels)?;
182-
LIFETIME_EVENTS_STORAGE_SIZE.remove_label_values(&storage_size_labels)?;
176+
remove_label_values(&EVENTS_INGESTED, &event_labels);
177+
remove_label_values(&EVENTS_INGESTED_SIZE, &event_labels);
178+
remove_label_values(&STORAGE_SIZE, &storage_size_labels);
179+
remove_label_values(&EVENTS_DELETED, &event_labels);
180+
remove_label_values(&EVENTS_DELETED_SIZE, &event_labels);
181+
remove_label_values(&DELETED_EVENTS_STORAGE_SIZE, &storage_size_labels);
182+
remove_label_values(&LIFETIME_EVENTS_INGESTED, &event_labels);
183+
remove_label_values(&LIFETIME_EVENTS_INGESTED_SIZE, &event_labels);
184+
remove_label_values(&LIFETIME_EVENTS_STORAGE_SIZE, &storage_size_labels);
183185

184186
delete_with_label_prefix(&EVENTS_INGESTED_DATE, &event_labels);
185187
delete_with_label_prefix(&EVENTS_INGESTED_SIZE_DATE, &event_labels);
@@ -188,15 +190,29 @@ pub fn delete_stats(stream_name: &str, format: &'static str) -> prometheus::Resu
188190
Ok(())
189191
}
190192

193+
#[inline]
194+
fn remove_label_values(lazy_static: &Lazy<IntGaugeVec>, event_labels: &[&str]) {
195+
if let Err(e) = lazy_static.remove_label_values(event_labels) {
196+
warn!("Unable to delete labels- {event_labels:?}\nwith error- {e}");
197+
}
198+
}
199+
191200
fn delete_with_label_prefix(metrics: &IntGaugeVec, prefix: &[&str]) {
192201
let families: Vec<MetricFamily> = metrics.collect().into_iter().collect();
193202
for metric in families.iter().flat_map(|m| m.get_metric()) {
194-
let label: Vec<&str> = metric.get_label().iter().map(|l| l.get_value()).collect();
195-
if !label.starts_with(prefix) {
196-
continue;
197-
}
198-
if let Err(err) = metrics.remove_label_values(&label) {
199-
warn!("Error = {err}");
203+
let label_map: HashMap<&str, &str> = metric
204+
.get_label()
205+
.iter()
206+
.map(|l| (l.get_name(), l.get_value()))
207+
.collect();
208+
209+
// Check if all prefix elements are present in label values
210+
let all_prefixes_found = prefix.iter().all(|p| label_map.values().any(|v| v == p));
211+
212+
if all_prefixes_found {
213+
if let Err(err) = metrics.remove(&label_map) {
214+
warn!("Error removing metric with labels {:?}: {err}", label_map);
215+
}
200216
}
201217
}
202218
}

0 commit comments

Comments
 (0)