Skip to content

Commit

Permalink
Fix cache hit avro data (#5637)
Browse files Browse the repository at this point in the history
  • Loading branch information
carlesarnal authored Dec 2, 2024
1 parent 38f0949 commit bebcf82
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ public Object convert(Schema schema, Object value) {

private int unionIndex = 0;

private Cache<Schema, org.apache.avro.Schema> fromConnectSchemaCache;
private Cache<AvroDataSchemaCacheKey, org.apache.avro.Schema> fromConnectSchemaCache;
private Cache<AvroSchemaAndVersion, Schema> toConnectSchemaCache;
private boolean connectMetaData;
private boolean generalizedSumTypeSupport;
Expand Down Expand Up @@ -793,14 +793,15 @@ public org.apache.avro.Schema fromConnectSchema(Schema schema,
return ANYTHING_SCHEMA;
}

org.apache.avro.Schema cached = fromConnectSchemaCache.get(schema);
AvroDataSchemaCacheKey cacheKey = new AvroDataSchemaCacheKey(schema);
org.apache.avro.Schema cached = fromConnectSchemaCache.get(cacheKey);
if (cached != null) {
return cached;
}

FromConnectContext fromConnectContext = new FromConnectContext(schemaMap);
org.apache.avro.Schema finalSchema = fromConnectSchema(schema, fromConnectContext, false);
fromConnectSchemaCache.put(schema, finalSchema);
fromConnectSchemaCache.put(cacheKey, finalSchema);
return finalSchema;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.apicurio.registry.utils.converter.avro;

import org.apache.kafka.connect.data.Schema;

import java.util.Map;
import java.util.Objects;

public class AvroDataSchemaCacheKey {
private final String schemaName;
private final Map<String, String> parameters;

public AvroDataSchemaCacheKey(Schema schema) {
this.schemaName = schema.name();
this.parameters = schema.parameters();
}

@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
AvroDataSchemaCacheKey other = (AvroDataSchemaCacheKey) obj;
return Objects.equals(schemaName, other.schemaName) &&
Objects.equals(parameters, other.parameters);
}

@Override
public int hashCode() {
return Objects.hash(schemaName, parameters);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertSame;

public class AvroDataTest {

@Test
Expand Down Expand Up @@ -116,4 +120,39 @@ public void testDecimal() {
org.apache.avro.Schema aSchema = avroData.fromConnectSchema(avroData.toConnectSchema(bSchema));
Assertions.assertEquals(bSchema.toString(), aSchema.toString());
}

@Test
void testCacheDistinguishesByParameters() {
AvroData avroData = new AvroData(5);
// Create two Connect schemas with the same name but different parameters
Schema schemaWithPrecision10 = createConnectSchema("io.debezium.data.VariableScaleDecimal", "10");
Schema schemaWithPrecision15 = createConnectSchema("io.debezium.data.VariableScaleDecimal", "15");

// Generate Avro schemas for both
org.apache.avro.Schema avroSchema1 = avroData.fromConnectSchema(schemaWithPrecision10);
org.apache.avro.Schema avroSchema2 = avroData.fromConnectSchema(schemaWithPrecision15);

// Verify that the two schemas are different
assertNotEquals(avroSchema1, avroSchema2, "Avro schemas with different parameters should not be equal");

// Verify that repeated calls with the same schema return the same instance (cache hit)
org.apache.avro.Schema avroSchema1Again = avroData.fromConnectSchema(schemaWithPrecision10);
assertSame(avroSchema1, avroSchema1Again, "Repeated calls with the same schema should return the cached instance");

org.apache.avro.Schema avroSchema2Again = avroData.fromConnectSchema(schemaWithPrecision15);
assertSame(avroSchema2, avroSchema2Again, "Repeated calls with the same schema should return the cached instance");
}

private Schema createConnectSchema(String name, String precision) {
// Create a struct schema similar to the example provided
return SchemaBuilder.struct()
.name(name)
.version(1)
.doc("Variable scaled decimal")
.field("scale", Schema.INT32_SCHEMA)
.field("value", Schema.BYTES_SCHEMA)
.parameter("precision", precision)
.optional()
.build();
}
}

0 comments on commit bebcf82

Please sign in to comment.