Skip to content

Commit a114e14

Browse files
fix: log source format in stream info (#1268)
rename as below - OtelLogs -> otel-logs OtelMetrics -> otel-metrics OtelTraces -> otel-traces Pmeta -> pmeta Json -> json Kinesis -> kinesis added migration steps to perform the rename for existing streams
1 parent 40b3195 commit a114e14

File tree

3 files changed

+104
-17
lines changed

3 files changed

+104
-17
lines changed

src/event/format/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,19 +59,25 @@ type EventSchema = Vec<Arc<Field>>;
5959
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)]
6060
pub enum LogSource {
6161
// AWS Kinesis sends logs in the format of a json array
62+
#[serde(rename = "kinesis")]
6263
Kinesis,
6364
// OpenTelemetry sends logs according to the specification as explained here
6465
// https://github.com/open-telemetry/opentelemetry-proto/tree/v1.0.0/opentelemetry/proto/logs/v1
66+
#[serde(rename = "otel-logs")]
6567
OtelLogs,
6668
// OpenTelemetry sends traces according to the specification as explained here
6769
// https://github.com/open-telemetry/opentelemetry-proto/blob/v1.0.0/opentelemetry/proto/trace/v1/trace.proto
70+
#[serde(rename = "otel-metrics")]
6871
OtelMetrics,
6972
// OpenTelemetry sends traces according to the specification as explained here
7073
// https://github.com/open-telemetry/opentelemetry-proto/tree/v1.0.0/opentelemetry/proto/metrics/v1
74+
#[serde(rename = "otel-traces")]
7175
OtelTraces,
7276
// Internal Stream format
77+
#[serde(rename = "pmeta")]
7378
Pmeta,
7479
#[default]
80+
#[serde(rename = "json")]
7581
// Json object or array
7682
Json,
7783
Custom(String),

src/migration/mod.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ async fn migrate_stream_metadata(
246246
stream_metadata_value = stream_metadata_migration::v1_v4(stream_metadata_value);
247247
stream_metadata_value = stream_metadata_migration::v4_v5(stream_metadata_value, stream);
248248
stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value);
249+
249250
storage
250251
.put_object(&path, to_bytes(&stream_metadata_value))
251252
.await?;
@@ -259,6 +260,7 @@ async fn migrate_stream_metadata(
259260
stream_metadata_value = stream_metadata_migration::v2_v4(stream_metadata_value);
260261
stream_metadata_value = stream_metadata_migration::v4_v5(stream_metadata_value, stream);
261262
stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value);
263+
262264
storage
263265
.put_object(&path, to_bytes(&stream_metadata_value))
264266
.await?;
@@ -272,13 +274,15 @@ async fn migrate_stream_metadata(
272274
stream_metadata_value = stream_metadata_migration::v3_v4(stream_metadata_value);
273275
stream_metadata_value = stream_metadata_migration::v4_v5(stream_metadata_value, stream);
274276
stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value);
277+
275278
storage
276279
.put_object(&path, to_bytes(&stream_metadata_value))
277280
.await?;
278281
}
279282
Some("v4") => {
280283
stream_metadata_value = stream_metadata_migration::v4_v5(stream_metadata_value, stream);
281284
stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value);
285+
282286
storage
283287
.put_object(&path, to_bytes(&stream_metadata_value))
284288
.await?;
@@ -289,7 +293,13 @@ async fn migrate_stream_metadata(
289293
.put_object(&path, to_bytes(&stream_metadata_value))
290294
.await?;
291295
}
292-
_ => (),
296+
_ => {
297+
stream_metadata_value =
298+
stream_metadata_migration::rename_log_source_v6(stream_metadata_value);
299+
storage
300+
.put_object(&path, to_bytes(&stream_metadata_value))
301+
.await?;
302+
}
293303
}
294304

295305
Ok(stream_metadata_value)

0 commit comments

Comments
 (0)