Skip to content

Kafka Message Format

Kafka messages are produced using the Avro serialization system. Avro encodes messages into a compact binary format, as fewer bytes are efficient for storage and transmission.

Serde

Serde is a combination of serialization and deserialization. Serialization is the process of converting an object into a byte stream, while deserialization is the reverse process of converting a byte stream back into an object. In the context of Kafka, serde is used to convert messages to and from their binary representation.

For example, let's take the word dog. When dog is encoded using UTF-16 big-endian, it beomes \x00\x64\x00\x6F\x00\x67 (in hexadecimal). This is what the producer sends to Kafka. When the consumer receives this message, it decodes the byte stream back into the string dog.

Because messages are bytes, the consumer requires a schema to understand what each sequence of bytes represents. This is where a schema registry comes into play. A central place where schemas are stored for producers and consumers to speak a common language.

The consumer needs to know what schema and version to decode messages sent by the producer. An artifact (or subject in Confluence) represents a schema or even an API design given a unique name. For artifact names, Apicurio has a default strategy called TopicNameStrategy where the kafka topic name is used as the artifact's name. Use -key or -value suffix after the topic name to specify if the artifact is for the key or value, as both can be in avro. For example, if you have a topic called safety-events and the message value is avro, given the TopicNameStrategy the artifact name would be safety-events-value. This strategy allows others to easily navigate the registry by naming artifacts based on the topic/suffix. The message format of a topic will be immediately clear to them.

An artifact can have multiple versions. Apicurio generates a new global ID for each new version of an artifact. For example, version 1.0.0 of the safety-events-value artifact has a global ID of 1, version 1.1.0 would have a global ID of 2, and so on. This global ID is what the producer needs to include at the beginning of the message to inform the consumer which schema version to use when decoding the message. Let's dig in a little deeper.

The Magic Byte

The magic byte is a single byte, most of the time it's 0, that resides at the beginning of avro messages. It's been said that the magic byte tells the consumer where to find the global ID. If the magic byte is missing, the consumer searches the message header for the global ID. Otherwise if present, the consumer knows the next 4 bytes is the global ID. Together, the magic byte and the global ID form a 5-byte prefix at the beginning of the message. This 5-byte prefix tells the consumer which schema version to use when decoding the message.

It looks like there is limited support for storing the global ID in the message header, as most the community is content with the 5-byte prefix.

Note: Use big-endian byte order for the global ID

Apicurio Endpoints

API V3

Get global ID for an artifact by name and version

GET /groups/default/artifacts/{artifactName}/versions/{version} - Returns metadata about the specified artifact (e.g. safety-events-value)

    {
        "version": "1",
        "description": "Initial version",
        "owner": "bob@vtti.vt.edu",
        "createdOn": "2025-07-01T21:45:36Z",
        "artifactType": "AVRO",
        "globalId": 18,
        "state": "ENABLED",
        "contentId": 11,
        "artifactId": "events-safety-value",
        "modifiedBy": "alice@vtti.vt.edu",
        "modifiedOn": "2025-07-01T21:45:36Z"
    } 

Get all versions for an artifact by name

GET /groups/default/artifacts/{artifactName}/versions

  • Returns all versions of the specified artifact (e.g. safety-events-value)
        {
            "count": 1,
            "versions": [
              {
                "description": "Initial version",
                "createdOn": "2025-07-02T21:45:36Z",
                "owner": "siverson@vtti.vt.edu",
                "artifactType": "AVRO",
                "state": "ENABLED",
                "globalId": 13,
                "version": "1",
                "contentId": 11,
                "artifactId": "derq.safetyInsights.iceberg",
                "modifiedBy": "siverson@vtti.vt.edu",
                "modifiedOn": "2025-07-02T21:45:36Z"
              }
            ]
        }
    

Get schema by global ID

GET /ids/globalIds/{globalId} - Returns the schema for the specified global ID

{
  "type": "record",
  "name": "safety-events-value",
  "fields": [
    {
      "name": "event_type",
      "type": "string"
    },
    {
      "name": "event_id",
      "type": "string"
    },
    {
      "name": "city",
      "type": "string"
    },
    {
      "name": "intersection",
      "type": "string"
    },
    {
      "name": "table",
      "type": "string"
    }
  ]
}

Linux users can access these endpoints with the curl command. For example, to obtain the global ID for the safety-events-value artifact, you can run:

curl -X GET "https://smart-cities-schemas.pre-prod.cloud.vtti.vt.edu:443/apis/registry/v3/groups/default/artifacts/safety-events-value/versions/1" | jq '.globalId'

Python Example

This short python script demonstrates how to produce a test derq message with the magic byte and global ID using the confluent-kafka library.

MAGIC_BYTE = 0                 
SCHEMA_REGISTRY_URL = "https://smart-cities-schemas.pre-prod.cloud.vtti.vt.edu:443/apis/registry/v3"
TOPIC = "safety-event"


def get_schema_id(artifact: str) -> int:
    """
    Look up artifact and return its global ID
    """
    resp = requests.get(
        f"{SCHEMA_REGISTRY_URL}/groups/default/artifacts/{artifact}/versions/1",
        timeout=5,
    )
    resp.raise_for_status()
    return resp.json()["globalId"]


def get_schema(schema_id: int) -> dict:
    """
    Fetch the schema by its global ID
    """
    resp = requests.get(
        f"{SCHEMA_REGISTRY_URL}/ids/globalIds/{schema_id}",
        timeout=5,
    )
    resp.raise_for_status()
    return resp.json()

def to_magic_byte_format(schema_id: int, schema_dict: dict, record: dict) -> bytes:
    """
    Build the 5-byte prefix and prepend to the avro-encoded record
    """
    # 1) Serialize the record with fastavro
    parsed_schema = parse_schema(schema_dict)
    buf = io.BytesIO()
    schemaless_writer(buf, parsed_schema, record)
    avro_bytes = buf.getvalue()

    # 2) Prepend magic byte + 4‑byte schema id (big‑endian)
    prefix = struct.pack(">bI", MAGIC_BYTE, schema_id)
    return prefix + avro_bytes


def main():

    sample_record = {
        "event_type"        : "IC",
        "event_id"          : "0685c1eeebdce150012ca897b",
        "time_at_site"      : 1757529600000000,
        "detection_area"    : "South Leg",
        "camera_id"         : "1",
        "city"              : "falls-church",
        "intersection"      : "maple_washington",
        "table"             : "falls-church.safety-event",
        "publish_timestamp" : 1757533200000000 
    }

    # the subject/artifact name to look up
    subject = f"safety-events-value"

    # get global Id
    schema_id = get_schema_id(subject)

    # get the schema with /ids/globalIds/{schema_id} endpoint
    user_schema = get_schema(schema_id)

    # convert the sample record to the magic byte format
    payload = to_magic_byte_format(schema_id, user_schema, sample_record)

    # Produce to Kafka
    producer = Producer({
        'bootstrap.servers': os.getenv('BOOTSTRAP_SERVERS'),
        'security.protocol': 'SASL_SSL',
        'sasl.mechanism': 'SCRAM-SHA-512',
        'sasl.username': os.getenv('KAFKA_USERNAME'),
        'sasl.password': os.getenv('KAFKA_PASSWORD'),
        'client.id': 'test-producer',
        'receive.message.max.bytes': 1213486200
        }
    )

    # produce avro message to topic
    producer.produce(TOPIC, 
                     value=payload, 
                     on_delivery=lambda err, msg: print(
        f"Delivered to {msg.topic()} partition {msg.partition()} offset {msg.offset()}" if err is None else f"ERROR: {err}"
    ))
    producer.flush()


if __name__ == "__main__":
    main()