DATAPHOS
Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Back to homepage

Usage

The Schema Registry REST API

Even thought the Schema Registry provides REST API for registering, updating, fetching a schema, fetching all the versions, fetching the latest, deleting a schema, etc. We will showcase here only the requests to register, update and fetch a schema.

Register a schema

After the Schema Registry is deployed you will have access to its API endpoint. To register a schema, you have to send a POST request to the endpoint http://schema-registry-svc:8080/schemas in whose body you need to provide the name of the schema, description, schema_type, specification (the schema), compatibility and validity mode.

The compatibility type determines how the Schema Registry compares the new schema with previous versions of a schema, for a given subject. The Dataphos Schema Registry default compatibility type is BACKWARD. All the compatibility types are described in more detail in the sections below.

Compatibility Type Changes allowed Check against which schemas Upgrade first
BACKWARD Delete fields
Add optional fields
Last version Consumers
BACKWARD_TRANSITIVE Delete fields
Add optional fields
All previous versions Consumers
FORWARD Add fields
Delete optional fields
Last version Producers
FORWARD_TRANSITIVE Add fields
Delete optional fields
All previous versions Producers
FULL Add optional fields
Delete optional fields
Last version Any order
FULL_TRANSITIVE Add optional fields
Delete optional fields
All previous versions Any order
NONE All changes are accepted Compatibility checking disabled Depends

The validity type determines how strict the Schema Registry will be when registering a schema. Meaning, will it demand that the schema is compliant with the rules of the data format or with the schema rules. The Dataphos Schema Registry default validity type is FULL. Possible values for the validity mode are: FULL, NONE, SYNTAX_ONLY.

{
    "description": "new json schema for testing", 
    "schema_type": "json", 
    "specification":  "{\r\n  \"$id\": \"https://example.com/person.schema.json\",\r\n  \"$schema\": \"https://json-schema.org/draft/2020-12/schema\",\r\n  \"title\": \"Person\",\r\n  \"type\": \"object\",\r\n  \"properties\": {\r\n    \"firstName\": {\r\n      \"type\": \"string\",\r\n      \"description\": \"The person's first name.\"\r\n    },\r\n    \"lastName\": {\r\n      \"type\": \"string\",\r\n      \"description\": \"The person's last name.\"\r\n    },\r\n    \"age\": {\r\n      \"description\": \"Age in years which must be equal to or greater than zero.\",\r\n      \"type\": \"integer\",\r\n      \"minimum\": 0\r\n    }\r\n  }\r\n}\r\n",
    "name": "schema json",
    "compatibility_mode": "BACKWARD",
    "validity_mode": "FULL"
}

Using curl:

curl -XPOST -H "Content-type: application/json" -d '{
    "description": "new json schema for testing", 
    "schema_type": "json", 
    "specification":  "{\r\n  \"$id\": \"https://example.com/person.schema.json\",\r\n  \"$schema\": \"https://json-schema.org/draft/2020-12/schema\",\r\n  \"title\": \"Person\",\r\n  \"type\": \"object\",\r\n  \"properties\": {\r\n    \"firstName\": {\r\n      \"type\": \"string\",\r\n      \"description\": \"The person's first name.\"\r\n    },\r\n    \"lastName\": {\r\n      \"type\": \"string\",\r\n      \"description\": \"The person's last name.\"\r\n    },\r\n    \"age\": {\r\n      \"description\": \"Age in years which must be equal to or greater than zero.\",\r\n      \"type\": \"integer\",\r\n      \"minimum\": 0\r\n    }\r\n  }\r\n}\r\n",
    "name": "schema json",
    "compatibility_mode": "BACKWARD",
    "validity_mode": "FULL"
}' 'http://schema-registry-svc:8080/schemas/'

The response to the schema registration request will be:

  • STATUS 201 Created

    {
        "identification": "32",
        "version": "1",
        "message": "schema successfully created"
    }
    
  • STATUS 409 Conflict -> indicating that the schema already exists

    {
        "identification": "32",
        "version": "1",
        "message": "schema already exists at id=32"
    }
    
  • STATUS 500 Internal Server Error -> indicating a server error, which means that either the request is not correct ( missing fields) or that the server is down.

    {
        "message": "Internal Server Error"
    }
    

Update a schema

After the Schema Registry is registered you can update it by registering a new version under that schema ID. To update a schema, you have to send a PUT request to the endpoint http://schema-registry-svc:8080/schemas/<schema_ID> in whose body you need to provide the description (optional) of the version and the specification (the schema)

{
    "description": "added field for middle name",
    "specification": "{\r\n  \"$id\": \"https://example.com/person.schema.json\",\r\n  \"$schema\": \"https://json-schema.org/draft/2020-12/schema\",\r\n  \"title\": \"Person\",\r\n  \"type\": \"object\",\r\n  \"properties\": {\r\n    \"firstName\": {\r\n      \"type\": \"string\",\r\n      \"description\": \"The person's first name.\"\r\n    },\r\n    \"lastName\": {\r\n      \"type\": \"string\",\r\n      \"description\": \"The person's last name.\"\r\n    },\r\n    \"lastName\": {\r\n      \"type\": \"string\",\r\n      \"description\": \"The person's last name.\"\r\n    },\r\n    \"age\": {\r\n      \"description\": \"Age in years which must be equal to or greater than zero.\",\r\n      \"type\": \"integer\",\r\n      \"minimum\": 0\r\n    }\r\n  }\r\n}\r\n"
}

Using curl:

curl -XPUT -H "Content-type: application/json" -d '{
    "description": "added field for middle name",
    "specification": "{\r\n  \"$id\": \"https://example.com/person.schema.json\",\r\n  \"$schema\": \"https://json-schema.org/draft/2020-12/schema\",\r\n  \"title\": \"Person\",\r\n  \"type\": \"object\",\r\n  \"properties\": {\r\n    \"firstName\": {\r\n      \"type\": \"string\",\r\n      \"description\": \"The person's first name.\"\r\n    },\r\n    \"lastName\": {\r\n      \"type\": \"string\",\r\n      \"description\": \"The person's last name.\"\r\n    },\r\n    \"lastName\": {\r\n      \"type\": \"string\",\r\n      \"description\": \"The person's last name.\"\r\n    },\r\n    \"age\": {\r\n      \"description\": \"Age in years which must be equal to or greater than zero.\",\r\n      \"type\": \"integer\",\r\n      \"minimum\": 0\r\n    }\r\n  }\r\n}\r\n"
}' 'http://schema-registry-svc:8080/schemas/<schema-id>'

The response to the schema updating request will be the same as for registering except when the updating is done successfully it will be status 200 OK and a new version will be provided.

{
    "identification": "32",
    "version": "2",
    "message": "schema successfully updated"
}

Fetch a schema version

To get a schema version and its relevant details, a GET request needs to be made and the endpoint needs to be:

http://schema-registry-svc:8080/schemas/<schema-id>/versions/<schema-version>

Using curl:

curl -XGET -H "Content-type: application/json" 'http://schema-registry-svc:8080/schemas/<schema-id>/versions/<schema-version>' 

The response to the schema registration request will be:

  • STATUS 200 OK
    {
        "id": "32",
        "version": "1",
        "schema_id": "32",
        "specification": "ew0KICAiJHNjaGVtYSI6ICJodHRwOi8vanNvbi1zY2hlbWEub3JnL2RyYWZ0LTA3L3NjaGVtYSIsDQogICJ0eXBlIjogIm9iamVjdCIsDQogICJ0aXRsZSI6ICJUaGUgUm9vdCBTY2hlbWEiLA0KICAiZGVzY3JpcHRpb24iOiAiVGhlIHJvb3Qgc2NoZW1hIGNvbXByaXNlcyB0aGUgZW50aXJlIEpTT04gZG9jdW1lbnQuIiwNCiAgImRlZmF1bHQiOiB7fSwNCiAgImFkZGl0aW9uYWxQcm9wZXJ0aWVzIjogdHJ1ZSwNCiAgInJlcXVpcmVkIjogWw0KICAgICJwaG9uZSINCiAgXSwNCiAgInByb3BlcnRpZXMiOiB7DQogICAgInBob25lIjogew0KICAgICAgInR5cGUiOiAiaW50ZWdlciIsDQogICAgICAidGl0bGUiOiAiVGhlIFBob25lIFNjaGVtYSIsDQogICAgICAiZGVzY3JpcHRpb24iOiAiQW4gZXhwbGFuYXRpb24gYWJvdXQgdGhlIHB1cnBvc2Ugb2YgdGhpcyBpbnN0YW5jZS4iLA0KICAgICAgImRlZmF1bHQiOiAiIiwNCiAgICAgICJleGFtcGxlcyI6IFsNCiAgICAgICAgMQ0KICAgICAgXQ0KICAgIH0sDQogICAgInJvb20iOiB7DQogICAgICAidHlwZSI6ICJpbnRlZ2VyIiwNCiAgICAgICJ0aXRsZSI6ICJUaGUgUm9vbSBTY2hlbWEiLA0KICAgICAgImRlc2NyaXB0aW9uIjogIkFuIGV4cGxhbmF0aW9uIGFib3V0IHRoZSBwdXJwb3NlIG9mIHRoaXMgaW5zdGFuY2UuIiwNCiAgICAgICJkZWZhdWx0IjogIiIsDQogICAgICAiZXhhbXBsZXMiOiBbDQogICAgICAgIDEyMw0KICAgICAgXQ0KICAgIH0NCiAgfQ0KfQ==",
        "description": "new json schema for testing",
        "schema_hash": "72966008fdcec8627a0e43c5d9a247501fc4ab45687dd2929aebf8ef3eb06ccd",
        "created_at": "2023-05-09T08:38:54.5515Z",
        "autogenerated": false
    }
    
  • STATUS 404 Not Found -> indicating that the wrong schema ID or schema version was provided
  • STATUS 500 Internal Server Error -> indicating a server error, which means that either the request is not correct ( wrong endpoint) or that the server is down.

Other requests

Description Method URL Headers Body
Get all the schemas GET http://schema-registry-svc/schemas Content-Type: application/json This request does not have a body
Get all the schema versions of the specified ID GET http://schema-registry-svc/schemas/{id}/versions Content-Type: application/json This request does not have a body
Get the latest schema version of the specified ID GET http://schema-registry-svc/schemas/{id}/versions/latest Content-Type: application/json This request does not have a body
Get schema specification by id and version GET http://schema-registry-svc/schemas/{id}/versions/{version}/spec Content-Type: application/json
This request does not have a body
Delete the schema under the ID DELETE http://schema-registry-svc/schemas/{id} Content-Type: application/json This request does not have a body
Delete the schema by id and version DELETE http://schema-registry-svc/schemas/{id}/versions/{version} Content-Type: application/json This request does not have a body

Worker message format

Depending on the technology your producer uses, the way you shape the message may differ and therefore the part of the message that contains the metadata might be called attributes, metadata, etc.

Besides the data field, which contains the message data, inside the attributes (or metadata) structure it’s important to add fields schemaId, versionId and format which are important information for the worker component. In case some additional attributes are provided, the worker won’t lose them, they will be delegated to the destination topic.

{
  "ID": "string",
  "Data": "string",
  "Attributes": {
    "schemaId": "string",
    "versionId": "string",
    "format": "string",
    // ...
  },
  "PublishTime": "time",
}
Field Description
Data string (bytes format)

The message data field. If this field is empty, the message must contain at least one attribute.

A base64-encoded string.
Attributes map (key: string, value: string)

Attributes for this message. If this field is empty, the message must contain non-empty data. This can be used to filter messages on the subscription.

An object containing a list of “key”: value pairs. Example: { “schemaId”: “1”, “versionId”: “2”, “format”: “json” }.
PublishTime time (time.Time format)

PublishTime is the time at which the message was published. This is populated by the server for Messages obtained from a subscription.
{
  "MessageID": "string",
  "Body": "string",
  "PartitionKey": "string", 
  "ApplicationProperties": {
    "schemaId": "string",
    "versionId": "string",
    "format": "string",
    // ...
  },
  "EnqueuedTime": "time"
}
Field Description
Body string (bytes format)

The message data field. If this field is empty, the message must contain at least one application property.
ApplicationProperties map (key: string, value: string)

Attributes for this message. ApplicationProperties can be used to store custom metadata for a message.

An object containing a list of “key”: value pairs. Example: { “schemaId”: “1”, “versionId”: “2”, “format”: “json” }.
PartitionKey string

PartitionKey is used with a partitioned entity and enables assigning related messages to the same internal partition. This ensures that the submission sequence order is correctly recorded. The partition is chosen by a hash function in Service Bus and cannot be chosen directly.
EnqueuedTime time (time.Time format)

EnqueuedTime is the UTC time when the message was accepted and stored by Service Bus.
{
  "Key": "string", 
  "Value": "string", 
  "Offset": "int64",
  "Partition": "int32",
  "Headers": {
    "schemaId": "string",
    "versionId": "string",
    "format": "string",
    // ...
  },
  "Timestamp": "time"
}
Field Description
Key string (bytes format)

Key is an optional field that can be used for partition assignment.
Value string (bytes format)

Value is blob of data to write to Kafka.
Offset int64

Offset is the offset that a record is written as.
Partition int32

Partition is the partition that a record is written to.
Headers map (key: string, value: string)

Headers are optional key/value pairs that are passed along with records.

Example: { “schemaId”: “1”, “versionId”: “2”, “format”: “json” }.

These are purely for producers and consumers; Kafka does not look at this field and only writes it to disk.
Timestamp time (time.Time format)

Timestamp is the timestamp that will be used for this record. Record batches are always written with “CreateTime”, meaning that timestamps are generated by clients rather than brokers.