Формат сообщения JSON — изменение потоковой передачи событий

Применимо к: SQL Server 2025 (17.x) Azure SQL DatabaseAzure SQL Managed Instance

В этой статье описывается формат JSON сообщения CloudEvents, который передается из SQL Server в Центры событий Azure при использовании функции потоковой передачи событий изменений (CES), представленной в SQL Server 2025 (17.x), Базе данных SQL Azure и Управляемом экземпляре SQL Azure.

Замечание

Потоковая передача событий изменений в настоящее время доступна в предварительной версии для:

Обзор

События, создаваемые потоковой передачей событий изменений, соответствуют спецификации CloudEvents , что упрощает интеграцию с системами, управляемыми событиями. Все ceS CloudEvents содержат 11 атрибутов (полей). CES можно настроить для сериализации CloudEvents в формате JSON (машинного кода) или в виде двоичного файла Avro. В следующих разделах этой статьи подробно описан формат сообщения, включая атрибуты CES CloudEvent и сериализацию.

При необходимости описания в этом разделе взяты из спецификации CloudEvent, которая содержит дополнительные сведения.

Атрибуты

  • specversion:

    • Тип данных: String
    • Обязательный атрибут CloudEvent
    • Версия спецификации CloudEvents, используемой событием. Это обеспечивает интерпретацию контекста.
  • type

    • Тип данных: String
    • Обязательный атрибут CloudEvent
    • Содержит значение, описывающее тип события, связанного с исходной вхождением. Формат этого определяется производителем и может включать такие сведения, как версия типа. Дополнительные сведения см. в разделе "Управление версиями CloudEvents".
    • Для событий Change Event Streaming в настоящее com.microsoft.SQL.CES.DML.V{n}время тип: , где {n} обозначает версию схемы события Microsoft Change Event Streaming DML.
      • Текущая последняя версия схемы — 1.
  • source

    • Тип данных: String
    • Обязательный атрибут CloudEvent
    • Определяет контекст, в котором произошло событие. Источник + идентификатор должен быть уникальным для каждого события. В настоящее время это поле всегда отправляется как \/ события, транслируемые из SQL.
  • id

    • Тип данных: String
    • Обязательный атрибут CloudEvent
    • Определяет событие. Производители должны убедиться, что источник + идентификатор является уникальным для каждого отдельного события. Если повторяющееся событие повторно (например, из-за сетевой ошибки) может иметь тот же идентификатор. Потребители могут предположить, что события с идентичным источником и идентификатором дублируются.
  • logicalid

    • Тип данных: String
    • Атрибут расширения
    • Разделенные сообщения (из-за ограничений размера msg центров событий) определяются общими логическими идентификаторами.
  • time

    • Тип данных: метка времени
    • Необязательный атрибут CloudEvent
    • UTC временная метка, когда коммит произошёл в рамках SQL-транзакции, которая изначально запускает потоковое событие.
  • datacontenttype

    • Тип данных: String
    • Необязательный атрибут CloudEvent
    • Тип контента значения данных. Этот атрибут позволяет данным переносить любой тип содержимого, в котором формат и кодировка могут отличаться от формата выбранного события. Например, событие, отображаемое с помощью формата конверта JSON, может содержать полезные данные XML в данных, и потребитель сообщает этому атрибуту значение application/xml. Правила для отображения содержимого данных для разных datacontenttype значений определены в спецификациях формата событий
  • operation

    • Тип данных: String
    • Расширение
    • Представляет тип SQL-операции, которая произошла:
      • INS для вставок
      • UPD для обновлений
      • DEL для удалений
  • segmentindex

    • Тип данных: целое число
    • Атрибут расширения
    • Индекс сегмента, обозначающий позицию сообщения в блоках логического сообщения. Индекс сегмента содержит сведения о том, где сообщение находится в последовательности фрагментов логического сообщения. Это поле всегда присутствует. Используйте поля logid, segmentindex + finalsegment, чтобы сортировать входящие события, представляющие большую SQL-нагрузку, разделённую на несколько событий.
  • finalsegment

    • Тип данных: Логический
    • Атрибут расширения
    • Указывает, является ли этот сегмент окончательным сегментом последовательности. Это поле всегда присутствует и помогает определить, было ли событие SQL, которое было слишком большим для настроенного максимального размера сообщения, было разделено на вложенные элементы.
  • data

    • Тип данных: String
    • Необязательный атрибут CloudEvent
    • Данные о событиях конкретного домена. Для CES данные — это строка, которую можно проанализировать как JSON. В этом формате JSON описывается изменение данных. Формат атрибута данных находится в формате атрибута Data.

Примеры

Пример сообщения JSON — вставка

{
  "specversion": "1.0",
  "type": "com.microsoft.SQL.CES.DML.V1",
  "source": "\/",
  "id": "d43f09a6-d13b-4902-86d4-17bdb5edb872",
  "logicalid": "9c8d4ad2-bf54-4f10-a96f-038af496997f:0000002C00000300017C:00000000000000000001",
  "time": "2025-03-14T16:45:20.650Z",
  "datacontenttype": "application\/json",
  "operation": "INS",
  "splitindex": 0,
  "splittotalcnt": 0,
  "data": "{\n  \"eventsource\": {\n    \"db\": \"db1\",\n    \"schema\": \"dbo\",\n    \"tbl\": \"Purchases\",\n    \"cols\": [\n      {\n        \"name\": \"purchase_id\",\n        \"type\": \"int\",\n        \"index\": 0\n      },\n      {\n        \"name\": \"customer_name\",\n        \"type\": \"varchar(100)\",\n        \"index\": 1\n      },\n      {\n        \"name\": \"product_id\",\n        \"type\": \"int\",\n        \"index\": 2\n      },\n      {\n        \"name\": \"product_name\",\n        \"type\": \"varchar(100)\",\n        \"index\": 3\n      },\n      {\n        \"name\": \"price_per_item\",\n        \"type\": \"int\",\n        \"index\": 4\n      },\n      {\n        \"name\": \"quantity\",\n        \"type\": \"int\",\n        \"index\": 5\n      },\n      {\n        \"name\": \"purchase_date\",\n        \"type\": \"datetime\",\n        \"index\": 6\n      },\n      {\n        \"name\": \"payment_method\",\n        \"type\": \"varchar(50)\",\n        \"index\": 7\n      }\n    ],\n    \"pkkey\": [\n      {\n        \"columnname\": \"purchase_id\",\n        \"value\": \"105\"\n      }\n    ]\n  },\n  \"eventrow\": {\n    \"old\": \"{}\",\n    \"current\": \"{\\\"purchase_id\\\": \\\"105\\\", \\\"customer_name\\\": \\\"Anna Doe\\\", \\\"product_id\\\": \\\"101\\\", \\\"product_name\\\": \\\"Game 2077\\\", \\\"price_per_item\\\": \\\"60\\\", \\\"quantity\\\": \\\"1\\\", \\\"purchase_date\\\": \\\"2025-03-14 16:45:01.000\\\", \\\"payment_method\\\": \\\"Credit Card\\\"}\"\n  }\n}"
}

Пример сообщения JSON — обновлен

{
  "specversion": "1.0",
  "type": "com.microsoft.SQL.CES.DML.V1",
  "source": "\/",
  "id": "c425575f-00bb-45cf-acec-c55fdc7d08cd",
  "logicalid": "9c8d4ad2-bf54-4f10-a96f-038af496997f:0000002C000003500004:00000000000000000001",
  "time": "2025-03-14T16:49:59.567Z",
  "datacontenttype": "application\/json",
  "operation": "UPD",
  "splitindex": 0,
  "splittotalcnt": 0,
  "data": "{\n  \"eventsource\": {\n    \"db\": \"db1\",\n    \"schema\": \"dbo\",\n    \"tbl\": \"Purchases\",\n    \"cols\": [\n      {\n        \"name\": \"purchase_id\",\n        \"type\": \"int\",\n        \"index\": 0\n      },\n      {\n        \"name\": \"customer_name\",\n        \"type\": \"varchar(100)\",\n        \"index\": 1\n      },\n      {\n        \"name\": \"product_id\",\n        \"type\": \"int\",\n        \"index\": 2\n      },\n      {\n        \"name\": \"product_name\",\n        \"type\": \"varchar(100)\",\n        \"index\": 3\n      },\n      {\n        \"name\": \"price_per_item\",\n        \"type\": \"int\",\n        \"index\": 4\n      },\n      {\n        \"name\": \"quantity\",\n        \"type\": \"int\",\n        \"index\": 5\n      },\n      {\n        \"name\": \"purchase_date\",\n        \"type\": \"datetime\",\n        \"index\": 6\n      },\n      {\n        \"name\": \"payment_method\",\n        \"type\": \"varchar(50)\",\n        \"index\": 7\n      }\n    ],\n    \"pkkey\": [\n      {\n        \"columnname\": \"purchase_id\",\n        \"value\": \"105\"\n      }\n    ]\n  },\n  \"eventrow\": {\n    \"old\": \"{}\",\n    \"current\": \"{\\\"purchase_id\\\": \\\"105\\\", \\\"customer_name\\\": \\\"Anna Doe\\\", \\\"product_id\\\": \\\"100\\\", \\\"product_name\\\": \\\"Game 2066\\\", \\\"price_per_item\\\": \\\"50\\\", \\\"quantity\\\": \\\"2\\\", \\\"purchase_date\\\": \\\"2025-03-14 16:45:01.000\\\", \\\"payment_method\\\": \\\"Credit Card\\\"}\"\n  }\n}"
}

Пример сообщения JSON — удаление

{
  "specversion": "1.0",
  "type": "com.microsoft.SQL.CES.DML.V1",
  "source": "\/",
  "id": "24fa0c2c-c45d-4abf-9a8d-fba04c29fc86",
  "logicalid": "9c8d4ad2-bf54-4f10-a96f-038af496997f:0000002C000003600019:00000000000000000001",
  "time": "2025-03-14T16:51:39.613Z",
  "datacontenttype": "application\/json",
  "operation": "DEL",
  "splitindex": 0,
  "splittotalcnt": 0,
  "data": "{\n  \"eventsource\": {\n    \"db\": \"db1\",\n    \"schema\": \"dbo\",\n    \"tbl\": \"Purchases\",\n    \"cols\": [\n      {\n        \"name\": \"purchase_id\",\n        \"type\": \"int\",\n        \"index\": 0\n      },\n      {\n        \"name\": \"customer_name\",\n        \"type\": \"varchar(100)\",\n        \"index\": 1\n      },\n      {\n        \"name\": \"product_id\",\n        \"type\": \"int\",\n        \"index\": 2\n      },\n      {\n        \"name\": \"product_name\",\n        \"type\": \"varchar(100)\",\n        \"index\": 3\n      },\n      {\n        \"name\": \"price_per_item\",\n        \"type\": \"int\",\n        \"index\": 4\n      },\n      {\n        \"name\": \"quantity\",\n        \"type\": \"int\",\n        \"index\": 5\n      },\n      {\n        \"name\": \"purchase_date\",\n        \"type\": \"datetime\",\n        \"index\": 6\n      },\n      {\n        \"name\": \"payment_method\",\n        \"type\": \"varchar(50)\",\n        \"index\": 7\n      }\n    ],\n    \"pkkey\": [\n      {\n        \"columnname\": \"purchase_id\",\n        \"value\": \"105\"\n      }\n    ]\n  },\n  \"eventrow\": {\n    \"old\": \"{\\\"purchase_id\\\": \\\"105\\\", \\\"customer_name\\\": \\\"Anna Doe\\\", \\\"product_id\\\": \\\"100\\\", \\\"product_name\\\": \\\"Game 2066\\\", \\\"price_per_item\\\": \\\"50\\\", \\\"quantity\\\": \\\"2\\\", \\\"purchase_date\\\": \\\"2025-03-14 16:45:01.000\\\", \\\"payment_method\\\": \\\"Credit Card\\\"}\",\n    \"current\": \"{}\"\n  }\n}"
}

Формат атрибута данных

Данные — это объект JSON, упакованный в строковый атрибут, содержащий два атрибута:

  • eventSource
  • eventRow
"data": "{ "eventsource": {<eventSource>}, "eventdata": {<eventData>}}"

Подробные сведения об этих двух атрибутах подробно описаны в следующих разделах:

eventsource

Описывает метаданные базы данных и таблицы, в которой произошло событие:

  • db

    • Тип данных: String
    • Описание: имя базы данных, в которой находится таблица.
    • Пример: cessqldb001
  • schema

    • Тип данных: String
    • Описание: схема базы данных, содержащая таблицу.
    • Пример: dbo
  • tbl

    • Тип данных: String
    • Описание: таблица, в которой произошло событие.
    • Пример: Purchases
  • cols

    • Тип данных: Массив
    • Описание: массив, подробный сведения о столбцах в таблице.
      • имя (строка): имя столбца.
      • тип (строка): тип данных столбца (VARCHAR или INT).
      • index (целое число): индекс или положение столбца в таблице.
  • pkkey

    • Тип данных: Массив
    • Описание. Представляет столбцы первичного ключа и их значения для идентификации конкретной строки.
      • columnname (string): имя столбца, используемого в первичном ключе.
      • value (string/int/etc.): значение столбца, используемого в первичном ключе, помогает однозначно определить строку.

eventrow

Описывает изменения на уровне строк и сравнивает старые и текущие значения полей в записи.

  • old (объект, завернутый в строку): представляет значения в строке перед событием.
    • Каждая пара "ключ-значение" состоит из следующих элементов:
      • <column_name>: (строка): имя столбца.
      • <column_value>: (string/int/etc.): предыдущее значение для этого столбца.
  • current (объект, упакованный в строку): представляет обновленные значения в строке после события.
    • Как и старый объект, с каждой парой "ключ-значение", структурированной как:
      • <column_name> (строка): имя столбца.
      • <column_value> (string/int/etc.): новое или текущее значение для этого столбца.

Схема JSON CES CloudEvent

{
  "type": "record",
  "name": "ChangeEvent",
  "fields": [
    {
      "name": "specversion",
      "type": "string"
    },
    {
      "name": "type",
      "type": "string"
    },
    {
      "name": "source",
      "type": "string"
    },
    {
      "name": "id",
      "type": "string"
    },
    {
      "name": "logicalid",
      "type": "string"
    },
    {
      "name": "time",
      "type": "string"
    },
    {
      "name": "datacontenttype",
      "type": "string"
    },
    {
      "name": "operation",
      "type": "string"
    },
    {
      "name": "segmentindex",
      "type": "int"
    },
    {
      "name": "finalsegment",
      "type": "boolean"
    },
    {
      "name": "data",
      "type": "bytes"
    }
  ]
}

Схема JSON атрибута данных CES

{
  "name": "Data",
  "type": "record",
  "fields": [
    {
      "name": "eventsource",
      "type": {
        "name": "EventSource",
        "type": "record",
        "fields": [
          {
            "name": "db",
            "type": "string"
          },
          {
            "name": "schema",
            "type": "string"
          },
          {
            "name": "tbl",
            "type": "string"
          },
          {
            "name": "cols",
            "type": {
              "type": "array",
              "items": {
                "name": "Column",
                "type": "record",
                "fields": [
                  {
                    "name": "name",
                    "type": "string"
                  },
                  {
                    "name": "type",
                    "type": "string"
                  },
                  {
                    "name": "index",
                    "type": "int"
                  }
                ]
              }
            }
          },
          {
            "name": "pkkey",
            "type": {
              "type": "array",
              "items": {
                "name": "PkKey",
                "type": "record",
                "fields": [
                  {
                    "name": "columnname",
                    "type": "string"
                  },
                  {
                    "name": "value",
                    "type": "string"
                  }
                ]
              }
            }
          },
          {
            "name": "transaction",
            "type": {
              "name": "Transaction",
              "type": "record",
              "fields": [
                {
                  "name": "commitlsn",
                  "type": "string"
                },
                {
                  "name": "beginlsn",
                  "type": "string"
                },
                {
                  "name": "sequencenumber",
                  "type": "int"
                },
                {
                  "name": "committime",
                  "type": "string"
                }
              ]
            }
          }
        ]
      }
    },
    {
      "name": "eventrow",
      "type": {
        "name": "EventRow",
        "type": "record",
        "fields": [
          {
            "name": "old",
            "type": "string"
          },
          {
            "name": "current",
            "type": "string"
          }
        ]
      }
    }
  ]
}