An Azure real-time data ingestion service.
Hi @LathaSree
Thank you for contacting Microsoft Q&A. Please find below the detailed steps to address the reported issue.
This failure is not caused by your payload or AMQP pipeline—your own diagnostic confirms AMQP consumers read the full 1190‑byte JSON correctly. The corruption occurs only on the Kafka endpoint, and the error:
CorruptRecordError: Invalid record size: expected 1190 bytes … read 6
is a classic symptom of a Kafka client using an older message format version than the broker expects.
Azure Event Hubs' Kafka endpoint supports Kafka API version 1.0+ only (and internally aligns with the Kafka 1.0+ message format). When the kafka-python producer is explicitly forced to use:
api_version = (0, 10)
It switches to the Kafka 0.10 message format, which Event Hubs does NOT support. Event Hubs then correctly stores the message length metadata—but the body retrieval fails because the consumer expects a Kafka‑0.10 format envelope that Event Hubs never produced.
This mismatch leads directly to the “expected X bytes, read Y bytes” corruption behavior.
This is why AMQP works (AMQP path is unaffected) and both Kafka consumers fail (both read from the Kafka endpoint using a mismatched wire format).
__
Solution -__
1. Remove or update the forced API version
Do NOT force api_version = (0, 10) in kafka-python. Instead, use API auto‑detection:
Python
KafkaProducer(
bootstrap_servers=[your_eventhub_fqdn:9093],
security_protocol="SASL_SSL",
sasl_mechanism="PLAIN",
sasl_plain_username="$ConnectionString",
sasl_plain_password=event_hubs_connection_string,
api_version_auto_timeout_ms=60000 # Allow auto-negotiation
)
Or explicitly set the correct modern API version:
Python
api_version = (1, 0)
or higher, depending on your kafka-python version
Event Hubs requires Kafka 1.0+ (per Microsoft Learn) Troubleshoot connectivity issues - Azure Event Hubs - Azure Event Hubs | Microsoft Learn
This single change resolves the corrupted‑record behavior.
2. Validate your compression configuration
If you're using Snappy (which is common in Kafka), be aware Event Hubs only supports GZIP for Kafka messages. Snappy or LZ4 causes partial‑reads or decode failures.
From official docs (compression limits): Configure Azure Event Hubs and Kafka data flow endpoints in Azure IoT Operations - Azure IoT Operations | Microsoft Learn
Ensure your producer includes:
Python
compression_type="gzip"
3. Ensure your consumer also uses Kafka 1.0+
For both confluent-kafka and kafka-python, remove forced versions or set:
Python
api_version = (1, 0)
This matches Event Hubs' Kafka wire format and stops corruption.