Skip to content

Commit a9cb8d4

Browse files
committed
Add support for JSON/camelcase conversion
1 parent d2b6fec commit a9cb8d4

File tree

3 files changed

+108
-18
lines changed

3 files changed

+108
-18
lines changed

protobuf-converter/src/main/java/io/confluent/connect/protobuf/ProtobufData.java

+35-8
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,7 @@ public class ProtobufData {
306306
private boolean useWrapperForRawPrimitives;
307307
private boolean generateStructForNulls;
308308
private boolean generateIndexForUnions;
309+
private boolean useJsonFieldNames;
309310

310311
public ProtobufData() {
311312
this(new ProtobufDataConfig.Builder().with(
@@ -332,6 +333,7 @@ public ProtobufData(ProtobufDataConfig protobufDataConfig) {
332333
this.useWrapperForRawPrimitives = protobufDataConfig.useWrapperForRawPrimitives();
333334
this.generateStructForNulls = protobufDataConfig.generateStructForNulls();
334335
this.generateIndexForUnions = protobufDataConfig.generateIndexForUnions();
336+
this.useJsonFieldNames = protobufDataConfig.useJsonFieldNames();
335337
}
336338

337339
/**
@@ -519,10 +521,14 @@ private Object fromConnectData(
519521
throw new DataException("Invalid message name: " + scopedStructName);
520522
}
521523
for (Field field : schema.fields()) {
522-
String fieldName = scrubName(field.name());
523-
Object fieldCtx = getFieldType(ctx, fieldName);
524+
String fieldName = field.name();
525+
if (useJsonFieldNames) {
526+
fieldName = fromJsonCase(fieldName);
527+
}
528+
String scrubbedFieldName = scrubName(fieldName);
529+
Object fieldCtx = getFieldType(ctx, scrubbedFieldName);
524530
Object connectFieldVal = ignoreDefaultForNullables
525-
? struct.getWithoutDefault(field.name()) : struct.get(field);
531+
? struct.getWithoutDefault(fieldName) : struct.get(field);
526532
Object fieldValue = fromConnectData(
527533
fieldCtx,
528534
field.schema(),
@@ -539,10 +545,10 @@ private Object fromConnectData(
539545
fieldValue = union.getValue();
540546
} else {
541547
fieldDescriptor = messageBuilder.getDescriptorForType()
542-
.findFieldByName(fieldName);
548+
.findFieldByName(scrubbedFieldName);
543549
}
544550
if (fieldDescriptor == null) {
545-
throw new DataException("Cannot find field with name " + fieldName);
551+
throw new DataException("Cannot find field with name " + scrubbedFieldName);
546552
}
547553
if (fieldValue != null) {
548554
messageBuilder.setField(fieldDescriptor, fieldValue);
@@ -727,12 +733,16 @@ private MessageDefinition messageDefinitionFromConnectSchema(
727733
String fieldTag = fieldSchema.parameters() != null ? fieldSchema.parameters()
728734
.get(PROTOBUF_TYPE_TAG) : null;
729735
int tag = fieldTag != null ? Integer.parseInt(fieldTag) : index++;
736+
String fieldName = field.name();
737+
if (useJsonFieldNames) {
738+
fieldName = fromJsonCase(fieldName);
739+
}
730740
FieldDefinition fieldDef = fieldDefinitionFromConnectSchema(
731741
ctx,
732742
schema,
733743
message,
734744
fieldSchema,
735-
scrubName(field.name()),
745+
scrubName(fieldName),
736746
tag
737747
);
738748
if (fieldDef != null) {
@@ -763,6 +773,22 @@ private MessageDefinition messageDefinitionFromConnectSchema(
763773
return message.build();
764774
}
765775

776+
private static String fromJsonCase(final String str) {
777+
final StringBuilder sb = new StringBuilder();
778+
for (int i = 0; i < str.length(); i++) {
779+
char c = str.charAt(i);
780+
if (Character.isUpperCase(c)) {
781+
if (i != 0) {
782+
sb.append("_");
783+
}
784+
sb.append(Character.toLowerCase(c));
785+
} else {
786+
sb.append(c);
787+
}
788+
}
789+
return sb.toString();
790+
}
791+
766792
private void oneofDefinitionFromConnectSchema(
767793
FromConnectContext ctx,
768794
DynamicSchema.Builder schema,
@@ -1363,7 +1389,7 @@ private void setStructField(
13631389
Struct result,
13641390
FieldDescriptor fieldDescriptor
13651391
) {
1366-
final String fieldName = fieldDescriptor.getName();
1392+
final String fieldName = useJsonFieldNames ? fieldDescriptor.getJsonName() : fieldDescriptor.getName();
13671393
final Field field = schema.field(fieldName);
13681394
if ((isPrimitiveOrRepeated(fieldDescriptor) && !isOptional(fieldDescriptor))
13691395
|| (generateStructForNulls || message.hasField(fieldDescriptor))) {
@@ -1425,7 +1451,8 @@ private SchemaBuilder toConnectSchema(
14251451
// Already added field as oneof
14261452
continue;
14271453
}
1428-
builder.field(fieldDescriptor.getName(), toConnectSchema(ctx, fieldDescriptor));
1454+
final String fieldName = useJsonFieldNames ? fieldDescriptor.getJsonName() : fieldDescriptor.getName();
1455+
builder.field(fieldName, toConnectSchema(ctx, fieldDescriptor));
14291456
}
14301457
}
14311458

protobuf-converter/src/main/java/io/confluent/connect/protobuf/ProtobufDataConfig.java

+13-1
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,11 @@ public class ProtobufDataConfig extends AbstractDataConfig {
6969
public static final String GENERATE_INDEX_FOR_UNIONS_DOC = "Whether to suffix union"
7070
+ "names with an underscore followed by an index";
7171

72+
public static final String JSON_FIELD_NAMES_CONFIG = "json.field.names";
73+
public static final boolean JSON_FIELD_NAMES_DEFAULT = false;
74+
public static final String JSON_FIELD_NAMES_DOC = "Whether to convert protobuf field names "
75+
+ "to camelcase for internal data representation and vice-versa.";
76+
7277
public static ConfigDef baseConfigDef() {
7378
return AbstractDataConfig.baseConfigDef()
7479
.define(ENHANCED_PROTOBUF_SCHEMA_SUPPORT_CONFIG,
@@ -112,7 +117,12 @@ public static ConfigDef baseConfigDef() {
112117
ConfigDef.Type.BOOLEAN,
113118
GENERATE_INDEX_FOR_UNIONS_DEFAULT,
114119
ConfigDef.Importance.LOW,
115-
GENERATE_INDEX_FOR_UNIONS_DOC
120+
GENERATE_INDEX_FOR_UNIONS_DOC)
121+
.define(JSON_FIELD_NAMES_CONFIG,
122+
ConfigDef.Type.BOOLEAN,
123+
JSON_FIELD_NAMES_DEFAULT,
124+
ConfigDef.Importance.LOW,
125+
JSON_FIELD_NAMES_DOC
116126
);
117127
}
118128

@@ -156,6 +166,8 @@ public boolean generateIndexForUnions() {
156166
return this.getBoolean(GENERATE_INDEX_FOR_UNIONS_CONFIG);
157167
}
158168

169+
public boolean useJsonFieldNames() { return this.getBoolean(JSON_FIELD_NAMES_CONFIG); }
170+
159171
public static class Builder {
160172

161173
private final Map<String, Object> props = new HashMap<>();

protobuf-converter/src/test/java/io/confluent/connect/protobuf/ProtobufConverterTest.java

+60-9
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.google.common.collect.ImmutableList;
2020
import com.google.common.collect.ImmutableMap;
21+
import com.google.protobuf.Descriptors;
2122
import com.google.protobuf.Descriptors.Descriptor;
2223
import com.google.protobuf.ListValue;
2324
import com.google.protobuf.Timestamp;
@@ -28,21 +29,16 @@
2829
import io.confluent.kafka.serializers.protobuf.test.KeyTimestampValueOuterClass.KeyTimestampValue;
2930
import io.confluent.kafka.serializers.protobuf.test.TestMessageProtos.TestMessage2;
3031
import io.confluent.kafka.serializers.protobuf.test.TimestampValueOuterClass.TimestampValue;
31-
import java.util.List;
32-
import org.apache.kafka.connect.data.Schema;
33-
import org.apache.kafka.connect.data.SchemaAndValue;
34-
import org.apache.kafka.connect.data.SchemaBuilder;
35-
import org.apache.kafka.connect.data.Struct;
32+
33+
import java.util.*;
34+
35+
import org.apache.kafka.connect.data.*;
3636
import org.junit.Before;
3737
import org.junit.Ignore;
3838
import org.junit.Test;
3939

4040
import java.io.IOException;
4141
import java.nio.ByteBuffer;
42-
import java.util.Arrays;
43-
import java.util.Collections;
44-
import java.util.HashMap;
45-
import java.util.Map;
4642

4743
import io.confluent.connect.protobuf.test.Key;
4844
import io.confluent.connect.protobuf.test.KeyValue;
@@ -281,6 +277,23 @@ public void testFromConnectDataForValue() {
281277
assertArrayEquals(expected, Arrays.copyOfRange(result, PROTOBUF_BYTES_START, result.length));
282278
}
283279

280+
@Test
281+
public void testFromConnectDataForValueUseJsonFieldNames() {
282+
final byte[] expected = HELLO_WORLD_MESSAGE.toByteArray();
283+
284+
Map<String, Object> configs = new HashMap<>(SR_CONFIG);
285+
configs.put(ProtobufDataConfig.JSON_FIELD_NAMES_CONFIG, true);
286+
converter.configure(configs, false);
287+
288+
SchemaAndValue schemaAndValue = getExpectedTestMessageWithJsonFieldNames();
289+
290+
byte[] result = converter.fromConnectData("my-topic",
291+
schemaAndValue.schema(), schemaAndValue.value()
292+
);
293+
294+
assertArrayEquals(expected, Arrays.copyOfRange(result, PROTOBUF_BYTES_START, result.length));
295+
}
296+
284297
@Test
285298
public void testFromConnectDataForValueWithNamespace() {
286299
final byte[] expected = HELLO_WORLD_MESSAGE.toByteArray();
@@ -531,6 +544,44 @@ public void testToConnectDataForValue() throws Exception {
531544
assertEquals(expected, result);
532545
}
533546

547+
@Test
548+
public void testToConnectDataForValueUseJsonFieldNames() throws Exception {
549+
Map<String, Object> configs = new HashMap<>(SR_CONFIG);
550+
configs.put(ProtobufDataConfig.JSON_FIELD_NAMES_CONFIG, true);
551+
converter.configure(configs, false);
552+
// extra byte for message index
553+
final byte[] input = concat(new byte[]{0, 0, 0, 0, 1, 0}, HELLO_WORLD_MESSAGE.toByteArray());
554+
schemaRegistry.register("my-topic-value", getSchema(TestMessage.getDescriptor()));
555+
SchemaAndValue result = converter.toConnectData("my-topic", input);
556+
557+
SchemaAndValue expected = getExpectedTestMessageWithJsonFieldNames();
558+
559+
assertEquals(expected.schema(), result.schema());
560+
assertEquals(expected, result);
561+
}
562+
563+
private SchemaAndValue getExpectedTestMessageWithJsonFieldNames() {
564+
Struct testMessageStruct = getTestMessageStruct(TEST_MSG_STRING, 123);
565+
Schema testMessageSchema = getTestMessageSchema();
566+
567+
final SchemaBuilder builder = SchemaBuilder.struct();
568+
builder.name("TestMessage").version(1);
569+
List values = new ArrayList<>();
570+
for (Field field : testMessageSchema.fields()) {
571+
String jsonFieldName = TestMessage.getDescriptor()
572+
.findFieldByName(field.name()).getJsonName();
573+
builder.field(jsonFieldName, field.schema());
574+
values.add(testMessageStruct.get(field));
575+
}
576+
final Schema jsonSchema = builder.build();
577+
final Struct jsonStruct = new Struct(jsonSchema);
578+
final Iterator<Object> valuesIt = values.iterator();
579+
for (Field field : jsonSchema.fields()) {
580+
jsonStruct.put(field, valuesIt.next());
581+
}
582+
return new SchemaAndValue(jsonSchema, jsonStruct);
583+
}
584+
534585
@Test
535586
public void testToConnectDataForValueWithSecondMessage() throws Exception {
536587
converter.configure(SR_CONFIG, false);

0 commit comments

Comments
 (0)