Usage
Even thought the Schema Registry provides REST API for registering, updating, fetching a schema, fetching all the versions, fetching the latest, deleting a schema, etc. We will showcase here only the requests to register, update and fetch a schema.
After the Schema Registry is deployed you will have access to its API endpoint. To register a schema, you have to send a
POST request to the endpoint http://schema-registry-svc:8080/schemas
in whose body you need to provide the name of the
schema, description, schema_type, specification (the schema), compatibility and validity mode.
The compatibility type determines how the Schema Registry compares the new schema with previous versions of a schema,
for a given subject. The Dataphos Schema Registry default compatibility type is BACKWARD
. All the compatibility
types are described in more detail in the sections below.
Compatibility Type | Changes allowed | Check against which schemas | Upgrade first | Description |
---|---|---|---|---|
BACKWARD | Delete fields Add optional fields |
Last version | Consumers | Being able to understand messages from the last schema and the current schema. |
BACKWARD_TRANSITIVE | Delete fields Add optional fields |
All previous versions | Consumers | Being able to understand messages from all the previous schema versions and the current schema. |
FORWARD | Add fields Delete optional fields |
Last version | Producers | Being able to understand messages from the current schema and the next schema. |
FORWARD_TRANSITIVE | Add fields Delete optional fields |
All previous versions | Producers | Being able to understand messages from the current schema and all the next schema versions. |
FULL | Add optional fields Delete optional fields |
Last version | Any order | Being both backward and forward compatible. |
FULL_TRANSITIVE | Add optional fields Delete optional fields |
All previous versions | Any order | Being both backward_transitive and forward_transitive compatible. |
NONE | All changes are accepted | Compatibility checking disabled | Depends | All changes in the messages are acceptible. |
The validity type determines how strict the Schema Registry will be when registering a schema. Meaning, will it demand
that the schema is compliant with the rules of the data format or with the schema rules.
The Dataphos Schema Registry default validity type is FULL
. Possible values for the validity mode are: FULL
,
NONE
, SYNTAX_ONLY
.
{
"description": "new json schema for testing",
"schema_type": "json",
"specification": "{\r\n \"$id\": \"https://example.com/person.schema.json\",\r\n \"$schema\": \"https://json-schema.org/draft/2020-12/schema\",\r\n \"title\": \"Person\",\r\n \"type\": \"object\",\r\n \"properties\": {\r\n \"firstName\": {\r\n \"type\": \"string\",\r\n \"description\": \"The person's first name.\"\r\n },\r\n \"lastName\": {\r\n \"type\": \"string\",\r\n \"description\": \"The person's last name.\"\r\n },\r\n \"age\": {\r\n \"description\": \"Age in years which must be equal to or greater than zero.\",\r\n \"type\": \"integer\",\r\n \"minimum\": 0\r\n }\r\n }\r\n}\r\n",
"name": "schema json",
"compatibility_mode": "BACKWARD",
"validity_mode": "FULL"
}
Using curl:
curl -XPOST -H "Content-type: application/json" -d '{
"description": "new json schema for testing",
"schema_type": "json",
"specification": "{\r\n \"$id\": \"https://example.com/person.schema.json\",\r\n \"$schema\": \"https://json-schema.org/draft/2020-12/schema\",\r\n \"title\": \"Person\",\r\n \"type\": \"object\",\r\n \"properties\": {\r\n \"firstName\": {\r\n \"type\": \"string\",\r\n \"description\": \"The person's first name.\"\r\n },\r\n \"lastName\": {\r\n \"type\": \"string\",\r\n \"description\": \"The person's last name.\"\r\n },\r\n \"age\": {\r\n \"description\": \"Age in years which must be equal to or greater than zero.\",\r\n \"type\": \"integer\",\r\n \"minimum\": 0\r\n }\r\n }\r\n}\r\n",
"name": "schema json",
"compatibility_mode": "BACKWARD",
"validity_mode": "FULL"
}' 'http://schema-registry-svc:8080/schemas/'
The response to the schema registration request will be:
-
STATUS 201 Created
{ "identification": "32", "version": "1", "message": "schema successfully created" }
-
STATUS 409 Conflict -> indicating that the schema already exists
{ "identification": "32", "version": "1", "message": "schema already exists at id=32" }
-
STATUS 500 Internal Server Error -> indicating a server error, which means that either the request is not correct ( missing fields) or that the server is down.
{ "message": "Internal Server Error" }
After the Schema Registry is registered you can update it by registering a new version under that schema ID. To update a
schema, you have to send a PUT request to the endpoint http://schema-registry-svc:8080/schemas/<schema_ID>
in whose body
you need to provide the description (optional) of the version and the specification (the schema)
{
"description": "added field for middle name",
"specification": "{\r\n \"$id\": \"https://example.com/person.schema.json\",\r\n \"$schema\": \"https://json-schema.org/draft/2020-12/schema\",\r\n \"title\": \"Person\",\r\n \"type\": \"object\",\r\n \"properties\": {\r\n \"firstName\": {\r\n \"type\": \"string\",\r\n \"description\": \"The person's first name.\"\r\n },\r\n \"lastName\": {\r\n \"type\": \"string\",\r\n \"description\": \"The person's last name.\"\r\n },\r\n \"lastName\": {\r\n \"type\": \"string\",\r\n \"description\": \"The person's last name.\"\r\n },\r\n \"age\": {\r\n \"description\": \"Age in years which must be equal to or greater than zero.\",\r\n \"type\": \"integer\",\r\n \"minimum\": 0\r\n }\r\n }\r\n}\r\n"
}
Using curl:
curl -XPUT -H "Content-type: application/json" -d '{
"description": "added field for middle name",
"specification": "{\r\n \"$id\": \"https://example.com/person.schema.json\",\r\n \"$schema\": \"https://json-schema.org/draft/2020-12/schema\",\r\n \"title\": \"Person\",\r\n \"type\": \"object\",\r\n \"properties\": {\r\n \"firstName\": {\r\n \"type\": \"string\",\r\n \"description\": \"The person's first name.\"\r\n },\r\n \"lastName\": {\r\n \"type\": \"string\",\r\n \"description\": \"The person's last name.\"\r\n },\r\n \"lastName\": {\r\n \"type\": \"string\",\r\n \"description\": \"The person's last name.\"\r\n },\r\n \"age\": {\r\n \"description\": \"Age in years which must be equal to or greater than zero.\",\r\n \"type\": \"integer\",\r\n \"minimum\": 0\r\n }\r\n }\r\n}\r\n"
}' 'http://schema-registry-svc:8080/schemas/<schema-id>'
The response to the schema updating request will be the same as for registering except when the updating is done successfully it will be status 200 OK and a new version will be provided.
{
"identification": "32",
"version": "2",
"message": "schema successfully updated"
}
To get a schema version and its relevant details, a GET request needs to be made and the endpoint needs to be:
http://schema-registry-svc:8080/schemas/<schema-id>/versions/<schema-version>
Using curl:
curl -XGET -H "Content-type: application/json" 'http://schema-registry-svc:8080/schemas/<schema-id>/versions/<schema-version>'
The response to the schema registration request will be:
- STATUS 200 OK
{ "id": "32", "version": "1", "schema_id": "32", "specification": "ew0KICAiJHNjaGVtYSI6ICJodHRwOi8vanNvbi1zY2hlbWEub3JnL2RyYWZ0LTA3L3NjaGVtYSIsDQogICJ0eXBlIjogIm9iamVjdCIsDQogICJ0aXRsZSI6ICJUaGUgUm9vdCBTY2hlbWEiLA0KICAiZGVzY3JpcHRpb24iOiAiVGhlIHJvb3Qgc2NoZW1hIGNvbXByaXNlcyB0aGUgZW50aXJlIEpTT04gZG9jdW1lbnQuIiwNCiAgImRlZmF1bHQiOiB7fSwNCiAgImFkZGl0aW9uYWxQcm9wZXJ0aWVzIjogdHJ1ZSwNCiAgInJlcXVpcmVkIjogWw0KICAgICJwaG9uZSINCiAgXSwNCiAgInByb3BlcnRpZXMiOiB7DQogICAgInBob25lIjogew0KICAgICAgInR5cGUiOiAiaW50ZWdlciIsDQogICAgICAidGl0bGUiOiAiVGhlIFBob25lIFNjaGVtYSIsDQogICAgICAiZGVzY3JpcHRpb24iOiAiQW4gZXhwbGFuYXRpb24gYWJvdXQgdGhlIHB1cnBvc2Ugb2YgdGhpcyBpbnN0YW5jZS4iLA0KICAgICAgImRlZmF1bHQiOiAiIiwNCiAgICAgICJleGFtcGxlcyI6IFsNCiAgICAgICAgMQ0KICAgICAgXQ0KICAgIH0sDQogICAgInJvb20iOiB7DQogICAgICAidHlwZSI6ICJpbnRlZ2VyIiwNCiAgICAgICJ0aXRsZSI6ICJUaGUgUm9vbSBTY2hlbWEiLA0KICAgICAgImRlc2NyaXB0aW9uIjogIkFuIGV4cGxhbmF0aW9uIGFib3V0IHRoZSBwdXJwb3NlIG9mIHRoaXMgaW5zdGFuY2UuIiwNCiAgICAgICJkZWZhdWx0IjogIiIsDQogICAgICAiZXhhbXBsZXMiOiBbDQogICAgICAgIDEyMw0KICAgICAgXQ0KICAgIH0NCiAgfQ0KfQ==", "description": "new json schema for testing", "schema_hash": "72966008fdcec8627a0e43c5d9a247501fc4ab45687dd2929aebf8ef3eb06ccd", "created_at": "2023-05-09T08:38:54.5515Z", "autogenerated": false }
- STATUS 404 Not Found -> indicating that the wrong schema ID or schema version was provided
- STATUS 500 Internal Server Error -> indicating a server error, which means that either the request is not correct ( wrong endpoint) or that the server is down.
Description | Method | URL | Headers | Body |
---|---|---|---|---|
Get all the schemas | GET | http://schema-registry-svc/schemas | Content-Type: application/json | This request does not have a body |
Get all the schema versions of the specified ID | GET | http://schema-registry-svc/schemas/{id}/versions | Content-Type: application/json | This request does not have a body |
Get the latest schema version of the specified ID | GET | http://schema-registry-svc/schemas/{id}/versions/latest | Content-Type: application/json | This request does not have a body |
Get schema specification by id and version | GET | http://schema-registry-svc/schemas/{id}/versions/{version}/spec | Content-Type: application/json |
This request does not have a body |
Delete the schema under the ID | DELETE | http://schema-registry-svc/schemas/{id} | Content-Type: application/json | This request does not have a body |
Delete the schema by id and version | DELETE | http://schema-registry-svc/schemas/{id}/versions/{version} | Content-Type: application/json | This request does not have a body |
Aside from fetching schemas by their ID and version, they can also be fetched using search endpoint. Schemas can be searched on the “/schemas/search?” endpoint, following the search condition. There can be multiple criteria for the search, and they are in the following format: par1=val1&par2=val2&par3=val3. The parameters that schema can be searched upon are as follows:
- id
- version
- type
- name
- attributes
Additionally, they can be ordered by these parameters (except for attributes) in ascending/descending order (the default parameter to order by is ID), as well as limited to a certain number of items. The table below shows a few example of schema search requests (none have body).
Description | Method | URL | Headers |
---|---|---|---|
Get all schemas that contain schema_name in their name (case sensitive) | GET | http://schema-registry-svc/schemas/search?name=schema_name | Content-Type: application/json |
Get all schemas that contain schema_name in their name and type json | GET | http://schema-registry-svc/schemas/search?name=schema_name&type=json | Content-Type: application/json |
Get a schema with an ID | GET | http://schema-registry-svc/schemas/search?id=ID | Content-Type: application/json |
Get a schema with an ID in descending order | GET | http://schema-registry-svc/schemas/search?id=ID&sort=desc | Content-Type: application/json |
Get up to 50 schemas whose name schema_name in ascending order in respect to their names | GET | http://schema-registry-svc/schemas/search?id=schema_name&orderBy=name&sort=asc&limit=50 | Content-Type: application/json |
Get a schema whose name contains schema_name and attributes attr1 and attr2 | GET | http://schema-registry-svc/schemas/search?name=schema_name&attributes=attr1,attr2 | Content-Type: application/json |
Depending on the technology your producer uses, the way you shape the message may differ and therefore the part of the
message that contains the metadata might be called attributes
, metadata,
etc.
Besides the data field, which contains the message data, inside the attributes (or metadata) structure it’s important to
add fields schemaId
, versionId
and format
which are important information for the validator component. In case some additional attributes are provided, the validator
won’t lose them, they will be delegated to the destination topic.
{
"ID": "string",
"Data": "string",
"Attributes": {
"schemaId": "string",
"versionId": "string",
"format": "string",
// ...
},
"PublishTime": "time",
}
Field | Description |
---|---|
Data | string (bytes format) The message data field. If this field is empty, the message must contain at least one attribute. A base64-encoded string. |
Attributes | map (key: string, value: string) Attributes for this message. If this field is empty, the message must contain non-empty data. This can be used to filter messages on the subscription. An object containing a list of “key”: value pairs. Example: { “schemaId”: “1”, “versionId”: “2”, “format”: “json” }. |
PublishTime | time (time.Time format) PublishTime is the time at which the message was published. This is populated by the server for Messages obtained from a subscription. |
{
"MessageID": "string",
"Body": "string",
"PartitionKey": "string",
"ApplicationProperties": {
"schemaId": "string",
"versionId": "string",
"format": "string",
// ...
},
"EnqueuedTime": "time"
}
Field | Description |
---|---|
Body | string (bytes format) The message data field. If this field is empty, the message must contain at least one application property. |
ApplicationProperties | map (key: string, value: string) Attributes for this message. ApplicationProperties can be used to store custom metadata for a message. An object containing a list of “key”: value pairs. Example: { “schemaId”: “1”, “versionId”: “2”, “format”: “json” }. |
PartitionKey | string PartitionKey is used with a partitioned entity and enables assigning related messages to the same internal partition. This ensures that the submission sequence order is correctly recorded. The partition is chosen by a hash function in Service Bus and cannot be chosen directly. |
EnqueuedTime | time (time.Time format) EnqueuedTime is the UTC time when the message was accepted and stored by Service Bus. |
{
"Key": "string",
"Value": "string",
"Offset": "int64",
"Partition": "int32",
"Headers": {
"schemaId": "string",
"versionId": "string",
"format": "string",
// ...
},
"Timestamp": "time"
}
Field | Description |
---|---|
Key | string (bytes format) Key is an optional field that can be used for partition assignment. |
Value | string (bytes format) Value is blob of data to write to Kafka. |
Offset | int64 Offset is the offset that a record is written as. |
Partition | int32 Partition is the partition that a record is written to. |
Headers | map (key: string, value: string) Headers are optional key/value pairs that are passed along with records. Example: { “schemaId”: “1”, “versionId”: “2”, “format”: “json” }. These are purely for producers and consumers; Kafka does not look at this field and only writes it to disk. |
Timestamp | time (time.Time format) Timestamp is the timestamp that will be used for this record. Record batches are always written with “CreateTime”, meaning that timestamps are generated by clients rather than brokers. |