JSON Schema Evolution with Spring Kafka and Confluent Schema Registry

Introduction

Schemas and schema registries are the governance tools that ensure vitality of data communications through configurable guardrails in the form of patterns and rules to evolve data structures in a controlled fashion. These schemas represent the agreed data contracts between producers and consumers specifying the terms of data format along with ways it may change. In this article I will demonstrate how Confluent Schema Registry can be used in conjunction with JSON Schema to govern and enforce rules for schema evolution in the default backwards compatibility mode. However, I encourage interested readers to similarly experiment with other compatibility modes and schema changes to gain a deeper understanding of what to expect when working with these technologies.

Walkthrough of Demo App Specifications


For this article I will be using Spring Boot 3 with Spring for Kafka along with a containerized environment for both Kafka and Confluent Schema Registry. The project is available on GitHub so I invite readers to consult the build.gradle files for specifics on dependencies along with the README for instructions on running the sample projects.

To start I've defined a schema with a minimal set of fields representing a person to be shared over a Kafka topic. The schema defines a required name field and an optional dob field. This schema can be found within the file people-schemas/src/main/resources/json/person.json

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "Person",
  "description": "Schema representing a person",
  "type": "object",
  "additionalProperties": true,
  "properties": {
    "name": {
      "type": "string",
      "description": "Name of person."
    },
    "dob": {
      "type": "string",
      "format": "date-time",
      "description": "Birth date for person."
    }
  },
  "required": ["name"]
}

This project is using the jsonschema2pojo-gradle plugin to convert the JSON Schema to a versioned and sharable jar consiting of a JSON Schema enabled Person Java class. Below are the configurations I'm using for the jsonschema2pojo plugin.

jsonSchema2Pojo {
    targetPackage = 'com.thecodinginterface.peopleschemas'
    generateBuilders = true
    useTitleAsClassname = true
    useInnerClassBuilders = true
    includeAdditionalProperties = true
    includeConstructors = true
    includeJsr303Annotations = true
}

The first execution of the gradle assemble task generated an initial JSON Schema enabled Plain Ole Java Object (POJO) enclosed in people-schemas/build/libs/people-schemas-1.0.0.jar. This jar file was then shared with the people-producer and people-consumer projects by placing it in their respective libs directory and referenced in build.gradle. In this sample project I will be producing to and consuming from a topic named people.json-schema.

Below is a diagram showing how the producers and consumers integrate with the Confluent Schema Registry and Kafka. 

Adding New Optional Fields

Consider a requirement to optionally capture and transmit a nickname in this Person schema. Lets also say I was very absent minded and forget to update the schema and went ahead and told my colleagues they can start sending nicknames along with their person data of name and dob. Interestingly, because I've configured my JSON Schema as an open content model by setting additionalProperties to true any extra fields simply get captured within the additionalProperties object field of the resulting serialized and deserialized message passing through the Kafka topic.

You can prove this to yourself by spinning up the containerized environment along with the producer and consumer apps using the 1.0.0 version of the people-schemas jar then fire off the following POST request.

curl -X POST localhost:8080/api/people \
  -H 'accept: */*' \
  -H 'Content-Type: application/json' \
  -d '{
    "name": "John Doe",
    "dob": "2009-02-12T21:57:17",
    "nickname": "JD"
  }'

Checking the logs of the consumer app you can clearly see the capture of the nickname field within additionalProperties because the current schema definition didn't know what to do with it.

INFO --- [people-consumer] ... : Consumed com.thecodinginterface.peopleschemas.Person@200d261a[name=John Doe,dob=Thu Feb 12 15:57:17 CST 2009,additionalProperties={nickname=JD}]

To remediate my forgetful blunder I add a new nickname field to the schema within person.json as seen below.

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "Person",
  "description": "Schema representing a person",
  "type": "object",
  "additionalProperties": true,
  "properties": {
    "name": {
      "type": "string",
      "description": "Name of person."
    },
    "dob": {
      "type": "string",
      "format": "date-time",
      "description": "Birth date for person."
    },
    "nickname": {
      "type": "string",
      "description": "Preferred or common name"
    }
  },
  "required": ["name"]
}

Next I incremented the version within people-schemas/build.gradle from 1.0.0 to 1.1.0 and reran the assemble task producing the new distributable jar people-schemas/build/libs/people-schemas-1.1.0.jar. At this point it is now useful to consult Confluent's Schema Compatibility and Evolution Matrix to see how to introduce changes based off the compatibility mode associated with the schema registry for subjects where a subject represents the unique combination of a message's key or value schema plus the associated topic name its being produced to and consumed out of.

Given the schema registry in this example is using the default mode of BACKWARD compatibility its advised to update the consumer first, so I added the people-schemas-1.1.0.jar in the people-consumer project. If I again send a POST of a person containing the nickname field before updating the people-producer project with the new 1.1.0 schema the previous 1.0.0 schemas are flexibile enough to still publish the message with the supplied nickname. However, the newly updated consumer using the 1.1.0 version defining the nickname field properly deserializes messages published with the 1.0.0 schema thus you see the nickname as a proper field rather than within the additionalProperties object as seen in the consumer's logs.

INFO --- [people-consumer] ... : Consumed com.thecodinginterface.peopleschemas.Person@43c8b6a9[name=John Doe,dob=Thu Feb 12 15:57:17 CST 2009,nickname=JD,additionalProperties={}]

Of course, that was simply an experiment to understand the flexible behavior of the JSON Schema in the presence of new optional fields, so I updated the people-producer project to utilize the new 1.1.0 schema containing the nickname field as well.

To recap, when supplying a new optional field to a producer of a JSON Schema backed message, under backward compatibility, if the consumer isn't updated to use a compliant schema then new optional fields will be captured in the additionalProperties field object. Updating the consumer with a new schema version caused it to recognize the newly introduced optional fields even if produced by an older version of the schema (ie, it exhibits backward compatability from the point of view of the consumer).

Adding New Required Fields

Consider a requirement for mandatory capture and transmittence of a person's country of residence. Introducing a new mandatory field to a data contract is generally considered a breaking change. Breaking changes are a fact of life for developers supporting modern business systems in this world of hyper competition so its important to have a plan for how to handle them well before they become a story in your team's backlog or the ever popular "I need this updated yesterday" mindset many of us face.

Lets say I naively go ahead and update my schema to add the new required country field like seen below.

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "Person",
  "description": "Schema representing a person",
  "type": "object",
  "additionalProperties": true,
  "properties": {
    "name": {
      "type": "string",
      "description": "Name of person."
    },
    "dob": {
      "type": "string",
      "format": "date-time",
      "description": "Birth date for person."
    },
    "nickname": {
      "type": "string",
      "description": "Preferred or common name"
    },
    "country": {
      "type": "string",
      "description": "Country of residence"
    }
  },
  "required": ["name", "country"]
}

I then incremented the version field in build.gradle from 1.1.0 to 2.0.0, executed the assemble task to create a new people-schemas-2.0.0.jar, and distributed it to the producer then fired off a new POST that includes the country of the fictitious John Doe from Canada like so.

curl -X POST localhost:8080/api/people \
  -H 'accept: */*' \
  -H 'Content-Type: application/json' \
  -d '{
    "name": "John Doe",
    "dob": "2009-02-12T21:57:17",
    "nickname": "JD",
    "country": "Canada"
  }'

I see the following error in the logs letting me know I just introduced a breaking change which subsequently halts the producer from poluting the topic with data consumers are not fit to handle.

ERROR --- [people-producer] ... : Exception thrown when sending a message with key='null' and payload='com.thecodinginterface.peopleschemas.Person@3742863f[name=John Doe,dob=Thu Feb 12 15:57:17 CST 2009,...' to topic people.json-schema:

    org.apache.kafka.common.errors.InvalidConfigurationException: Schema being registered is incompatible with an earlier schema for subject "people.json-schema-value"; error code: 409

So what then are my options at this point?

One way is to introduce a new Kafka topic, say people-v2.json-schema, start publishing the new version 2.0.0 schema based person messages to the new topic and ask for consumers to either (a) add new consumers to the new topic with the latest schema to get the new version of the person message while dually consuming the previous topics exisitng data or, (b) if data processing new messages can wait, allow current consumers to process all existing in-flight messages in the original topic, then upgrade those existing consumers with the new schema and point them to the new topic.

In another approach, I could also simply remove the currently registered subject's schema versions associated with the original people.json-schema topic using the Confluent Schema Registry REST API and go ahead and update all my producers and consumers with the new 2.0.0 schema version.

curl -X DELETE localhost:8081/subjects/people.json-schema-value

When I try to send the same person representation with the manadatory country field it successfully gets published by the producer as well as deserialized and read by the consumer. All seems right in the world and there was no need to introduce a new Kafka topic, just update the producers and consumers with most recent versions of the schema and you're good to go consuming new data. But what about the old messages that are still hanging out in the existing people.json-schema topic? How does this approach affect the consumability of previously published messages which utilized the now non-existent schema versions given the consumer is pinned to the most recent 2.0.0 version knowing that messages live in the topic which don't have the required country field?

Lets take a look by simply resetting the consumer offsets for the people-consumer app and the people.json-schema topic like so (you'll need to shutdown the consumer to do this).

$ docker exec -it broker kafka-consumer-groups --bootstrap-server localhost:9092 --describe --all-groups

Consumer group 'people-consumer' has no active members.

GROUP           TOPIC              PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
people-consumer people.json-schema 0          2               2               0               -               -               -
people-consumer people.json-schema 2          1               1               0               -               -               -

$ docker exec -it broker kafka-consumer-groups --bootstrap-server localhost:9092 --group people-consumer --reset-offsets --to-earliest --topic people.json-schema --execute

GROUP                          TOPIC                          PARTITION  NEW-OFFSET
people-consumer                people.json-schema             0          0
people-consumer                people.json-schema             1          0
people-consumer                people.json-schema             2          0

After restarting the consumer app and watching its logs you'll see the flexibility of the open content model JSON Schema is able to navigate the variations in the schema. Now, depending on what this means to a consumer and the logic that it uses it may or may not be able to logically handle this type of situation where deserialization simply nulls the missing required fields. In other words, your code's logic needs to know what to do with it but deserialization won't be a problem.

INFO --- [people-consumer] ... : Consumed com.thecodinginterface.peopleschemas.Person@2da27513[name=John Doe,dob=Thu Feb 12 15:57:17 CST 2009,nickname=<null>,country=<null>,additionalProperties={}]
INFO --- [people-consumer] ... : Consumed com.thecodinginterface.peopleschemas.Person@3ccc5f93[name=John Doe,dob=Thu Feb 12 15:57:17 CST 2009,nickname=JD,country=<null>,additionalProperties={}]
INFO --- [people-consumer] ... : Consumed com.thecodinginterface.peopleschemas.Person@51735050[name=John Doe,dob=Thu Feb 12 15:57:17 CST 2009,nickname=JD,country=Canada,additionalProperties={}]

To summarize, adding a new required field is generally seen as a breaking change requiring some additional planning and strategy to safely introduce such a change but, the Confluent Schema Registry will definitely loudly throw and exception in your producer letting you know.

Removing Optional Fields

Sometimes we may be compelled to remove fields from a schema in order to reduce the size of it or to negate complexities associated with similarly named fields yet with semmantically different meanings. This could also be needed to evolve a field's data type.

Lets say I get a requirement to remove the optional dob field as that as has been deemed to sensitive of data to collect going forward. This would result in a schema update shown below.

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "Person",
  "description": "Schema representing a person",
  "type": "object",
  "additionalProperties": true,
  "properties": {
    "name": {
      "type": "string",
      "description": "Name of person."
    },
    "nickname": {
      "type": "string",
      "description": "Preferred or common name"
    },
    "country": {
      "type": "string",
      "description": "Country of residence"
    }
  },
  "required": ["name", "country"]
}

Again, I incremented the build.gradle version field from 2.0.0 to 2.1.0 and updated the consumer then producer as suggested by the Confluent Schema Registry docs for backwards compatibility evolution and finally test with a new message representing my John Doe person but leaving out the dob field.

curl -X POST localhost:8080/api/people \
  -H 'accept: */*' \
  -H 'Content-Type: application/json' \
  -d '{
    "name": "John Doe",
    "nickname": "JD",
    "country": "Canada"
  }'

Uh oh ... I get an internal server error with the following error in the producer app's logs as shown below.

ERROR --- [people-producer] ... : Exception thrown when sending a message with key='null' and payload='com.thecodinginterface.peopleschemas.Person@4a4caaad[name=John Doe,nickname=JD,country=Canada,additi...' to topic people.json-schema:

    org.apache.kafka.common.errors.InvalidConfigurationException: Schema being registered is incompatible with an earlier schema for subject "people.json-schema-value"; error code: 409

I have to admit this is a case where I think the Confluent folks haven't implemented the rules appropriately for backwards compatibility as removing an optional field is not something I'd typically see as a breaking change to a data contract. Of course, we as engineers still need to figure out how to get around this as our product teams are not going to care what issues may or may not exist in Confluent Schema Registry. Luckily, the same two approaches described in the section on "Adding New Required Fields" will work.

You can either introduce a new topic to publish the new schema of person messages to without the dob field and migrate the consumers over time as they exhaust the existing topic's messages or you could blow away the schema versions from schema registry, update all producers along with consumers to the new 2.1.0 schemas, and stay on the current topic. Again though, we need to be cautious about how that will affect the consumers ability to replay those existing messages that have a dob field present yet is absent from the current schema version consumers are using (ie, the 2.1.0 schema that doesn't have any knowledge of the dob field).

If I again delete the subject versions and reset my consumer group's offsets to earliest.

$ curl -X DELETE localhost:8081/subjects/people.json-schema-value
[3]%

$ docker exec -it broker kafka-consumer-groups --bootstrap-server localhost:9092 --group people-consumer --reset-offsets --to-earliest --topic people.json-schema --execute

GROUP                          TOPIC                          PARTITION  NEW-OFFSET
people-consumer                people.json-schema             0          0
people-consumer                people.json-schema             1          0
people-consumer                people.json-schema             2          0

 

Then watch the logs in the people-consumer app I can see that it is able to both consume new messages absent of the dob field as well as the existing messages that contain the dob field by capturing it in the additionalProperties object.

INFO --- [people-consumer] ... : Consumed com.thecodinginterface.peopleschemas.Person@6054e45b[name=John Doe,nickname=JD,country=<null>,additionalProperties={dob=1234475837000}]
INFO --- [people-consumer] ... : Consumed com.thecodinginterface.peopleschemas.Person@2724488e[name=John Doe,nickname=JD,country=<null>,additionalProperties={dob=1234475837000}]
INFO --- [people-consumer] ... : Consumed com.thecodinginterface.peopleschemas.Person@2bf4839c[name=John Doe,nickname=JD,country=Canada,additionalProperties={dob=1234475837000}]
INFO --- [people-consumer] ... : Consumed com.thecodinginterface.peopleschemas.Person@43249cda[name=John Doe,nickname=JD,country=Canada,additionalProperties={}]

In this section we saw that removing an optional field is interpretted as a breaking change by the Confluent Schema Registry when working with JSON Schema based messages. Then we also saw how to navigate or otherwise remediate this situation.

Removing Required Fields

If I were faced with a requirement to change the schema by removing an existing required field this would be seen as a breaking change. Luckily the solution would be to treat such a situation the same as described in the section on removing an optional field.

References for Additional Learning

Apache Kafka Crash Course

Evolving JSON Schema

Schema Evolution and Compatibility

Understanding JSON Schema Compatibility

Yes, Virginia, You Really Do Need Schema Registry

Share with friends and colleagues

[[ likes ]] likes

Navigation

Community favorites for Data Engineering

theCodingInterface