Skip to content

Commit 95343bf

Browse files
fix: stringify otel attributes of array types (#1298)
This PR separates out some of the well known attribute fields as separate columns, while all the unknown attributes are pushed to a separate single column called as "other_attributes". Purpose of this change is to ensure users don't end up with too many columns if they add too many attributes.
1 parent 06293cc commit 95343bf

File tree

5 files changed

+461
-139
lines changed

5 files changed

+461
-139
lines changed

src/handlers/http/cluster/mod.rs

+5-7
Original file line numberDiff line numberDiff line change
@@ -736,13 +736,11 @@ pub async fn get_node_info<T: Metadata + DeserializeOwned>(
736736
)
737737
.await?
738738
.iter()
739-
.filter_map(|x| {
740-
match serde_json::from_slice::<T>(x) {
741-
Ok(val) => Some(val),
742-
Err(e) => {
743-
error!("Failed to parse node metadata: {:?}", e);
744-
None
745-
}
739+
.filter_map(|x| match serde_json::from_slice::<T>(x) {
740+
Ok(val) => Some(val),
741+
Err(e) => {
742+
error!("Failed to parse node metadata: {:?}", e);
743+
None
746744
}
747745
})
748746
.collect();

src/otel/logs.rs

+30-6
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@ use opentelemetry_proto::tonic::logs::v1::SeverityNumber;
2323
use serde_json::Map;
2424
use serde_json::Value;
2525

26+
use super::otel_utils::add_other_attributes_if_not_empty;
2627
use super::otel_utils::collect_json_from_values;
2728
use super::otel_utils::convert_epoch_nano_to_timestamp;
2829
use super::otel_utils::insert_attributes;
30+
use super::otel_utils::merge_attributes_in_json;
2931

3032
pub const OTEL_LOG_KNOWN_FIELD_LIST: [&str; 6] = [
3133
"time_unix_nano",
@@ -58,6 +60,7 @@ fn flatten_severity(severity_number: i32) -> Map<String, Value> {
5860
/// this function is called recursively for each log record object in the otel logs
5961
pub fn flatten_log_record(log_record: &LogRecord) -> Map<String, Value> {
6062
let mut log_record_json: Map<String, Value> = Map::new();
63+
let mut other_attributes = Map::new();
6164
log_record_json.insert(
6265
"time_unix_nano".to_string(),
6366
Value::String(convert_epoch_nano_to_timestamp(
@@ -80,7 +83,11 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map<String, Value> {
8083
log_record_json.insert(key.to_owned(), body_json[key].to_owned());
8184
}
8285
}
83-
insert_attributes(&mut log_record_json, &log_record.attributes);
86+
insert_attributes(
87+
&mut log_record_json,
88+
&log_record.attributes,
89+
&mut other_attributes,
90+
);
8491
log_record_json.insert(
8592
"log_record_dropped_attributes_count".to_string(),
8693
Value::Number(log_record.dropped_attributes_count.into()),
@@ -99,6 +106,9 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map<String, Value> {
99106
Value::String(hex::encode(&log_record.trace_id)),
100107
);
101108

109+
// Add the `other_attributes` to the log record json
110+
add_other_attributes_if_not_empty(&mut log_record_json, &other_attributes);
111+
102112
log_record_json
103113
}
104114

@@ -107,14 +117,18 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map<String, Value> {
107117
fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec<Map<String, Value>> {
108118
let mut vec_scope_log_json = Vec::new();
109119
let mut scope_log_json = Map::new();
110-
120+
let mut other_attributes = Map::new();
111121
if let Some(scope) = &scope_log.scope {
112122
scope_log_json.insert("scope_name".to_string(), Value::String(scope.name.clone()));
113123
scope_log_json.insert(
114124
"scope_version".to_string(),
115125
Value::String(scope.version.clone()),
116126
);
117-
insert_attributes(&mut scope_log_json, &scope.attributes);
127+
insert_attributes(
128+
&mut scope_log_json,
129+
&scope.attributes,
130+
&mut other_attributes,
131+
);
118132
scope_log_json.insert(
119133
"scope_dropped_attributes_count".to_string(),
120134
Value::Number(scope.dropped_attributes_count.into()),
@@ -132,18 +146,26 @@ fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec<Map<String, Value>> {
132146
vec_scope_log_json.push(combined_json);
133147
}
134148

149+
// Add the `other_attributes` to the scope log json
150+
merge_attributes_in_json(other_attributes, &mut vec_scope_log_json);
151+
135152
vec_scope_log_json
136153
}
137154

138155
/// this function performs the custom flattening of the otel logs
139156
/// and returns a `Vec` of `Value::Object` of the flattened json
140157
pub fn flatten_otel_logs(message: &LogsData) -> Vec<Value> {
141158
let mut vec_otel_json = Vec::new();
159+
142160
for record in &message.resource_logs {
143161
let mut resource_log_json = Map::new();
144-
162+
let mut other_attributes = Map::new();
145163
if let Some(resource) = &record.resource {
146-
insert_attributes(&mut resource_log_json, &resource.attributes);
164+
insert_attributes(
165+
&mut resource_log_json,
166+
&resource.attributes,
167+
&mut other_attributes,
168+
);
147169
resource_log_json.insert(
148170
"resource_dropped_attributes_count".to_string(),
149171
Value::Number(resource.dropped_attributes_count.into()),
@@ -163,8 +185,10 @@ pub fn flatten_otel_logs(message: &LogsData) -> Vec<Value> {
163185
resource_logs_json.extend(resource_log_json.clone());
164186
}
165187

188+
// Add the `other_attributes` to the resource log json
189+
merge_attributes_in_json(other_attributes, &mut vec_resource_logs_json);
190+
166191
vec_otel_json.extend(vec_resource_logs_json);
167192
}
168-
169193
vec_otel_json.into_iter().map(Value::Object).collect()
170194
}

0 commit comments

Comments
 (0)