Skip to main content

Usage

Typical usage consists of uploading one or more schemas to the registry, encoding data using the registered schemas, and/or decoding encoded data by getting the schemas from the registry.

Creating the registry client

const { SchemaRegistry } = require('@kafkajs/confluent-schema-registry')

const registry = new SchemaRegistry({ host: 'http://localhost:8081' })

For more configuration options, see configuration.

Uploading schemas

The schemas can be registered with the schema registry using registry.register({ type: SchemaType, schema: string }), which resolves to an object containing the schema id. This schema id is later used when encoding.

const schema = {
type: SchemaType.AVRO | SchemaType.JSON | SchemaType.PROTOBUF,
schema: "string"
}

const options = {
subject: "string"
}

await registry.register(schema, options)

Avro

const schema = `
{
"type": "record",
"name": "RandomTest",
"namespace": "examples",
"fields": [{ "type": "string", "name": "fullName" }]
}
`
const { id } = await registry.register({ type: SchemaType.AVRO, schema })

To simplify working with Avro schemas and integrating with existing tooling, some utility functions are available. Schemas can be defined in either AVSC or AVDL format, and are read using readAVSCAsync and avdlToAVSCAsync respectively.

Note that these functions return objects rather than strings, but they can be passed directly to register as the schema argument and will be stringified internally.

const { SchemaType, readAVSCAsync, avdlToAVSCAsync } = require('@kafkajs/confluent-schema-registry')

// From an avsc file
const schema = await readAVSCAsync('path/to/schema.avsc')
const { id } = await registry.register({ type: SchemaType.AVRO, schema }) // { id: 2 }

// From an avdl file
const schema = await avdlToAVSCAsync('path/to/protocol.avdl')
const { id } = await registry.register({ type: SchemaType.AVRO, schema }) // { id: 3 }

Subject

For Avro schemas, the subject is automatically inferred from the schema if options.subject is not set.

See Subjects for more information on subjects

JSON Schema

const { SchemaType } = require('@kafkajs/confluent-schema-registry')

const schema = `
{
"definitions" : {
"record:examples.Person" : {
"type" : "object",
"required" : [ "fullName" ],
"additionalProperties" : false,
"properties" : {
"fullName" : {
"type" : "string"
}
}
}
},
"$ref" : "#/definitions/record:examples.Person"
}
`
const { id } = await registry.register({ type: SchemaType.JSON, schema })

Protobuf

const { SchemaType } = require('@kafkajs/confluent-schema-registry')

const schema = `
package examples;
message RandomTest {
required string fullName = 1;
}
`
const { id } = await registry.register({ type: SchemaType.PROTOBUF, schema })

Compatibility

The compatibility of the schema will be whatever the global default is (typically BACKWARD). It's possible to override this for the specific subject by setting it like so:

const {
COMPATIBILITY: { NONE },
} = require('@kafkajs/confluent-schema-registry')
await registry.register(schema, { compatibility: NONE })

NOTE: If the subject already has an overridden compatibility setting and it's different, the client will throw and error (ConfluentSchemaRegistryCompatibilityError)

Subjects

Each schema is registered under a subject. In Avro, this subject is generated by concatenating the schema namespace and the schema name with a separator. For example, the following schema would get the subject com.example.Simple:

@namespace("com.example")
protocol SimpleProto {
record Simple {
string foo;
}
}

registry.register accepts a subject option to override the subject entirely:

await registry.register(schema, { subject: 'my-fixed-subject' })

If you just want to change the separator used when automatically creating the subject, use the separator option:

// This would result in "com.example-Simple"
await registry.register(schema, { separator: '-' })

Other schema types

For non-Avro schema types, subject is required and the method will throw if not provided.

Encoding data

To encode data, call registry.encode with the schema id and the payload to encode.

const payload = { full_name: 'John Doe' }
await registry.encode(id, payload)

Decoding data

The encoded payload contains the schema id of the schema used to decode it, so to decode, simply call registry.decode with the encoded payload. The corresponding schema will be downloaded from the registry if needed in order to decode the payload.

const payload = await registry.decode(buffer)
// { full_name: 'John Doe' }

registry.decode has an optional second options argument with options specific to each schema type.

Avro

With Avro you can specify a specific reader schema to use to decode the message, rather than using the schema registered in the registry. This can be useful if you need a projection that is different from the writer schema, or if you want to decode a message with a different version than was used to encode the message.

import avro from 'avsc'
import { readAVSCAsync } from '@kafkajs/confluent-schema-registry'

const readerSchema = await readAVSCAsync('path/to/protocol.avdl')

const payload = await registry.decode(buffer, {
[SchemaType.AVRO]: { readerSchema }
})

Configuration

Retry

By default, all GET requests will retry three times in case of failure. If you want to tweak this config you can do:

const registry = new SchemaRegistry({
host: 'http://localhost:8081',
retry: {
maxRetryTimeInSecs: 5,
initialRetryTimeInSecs: 0.1,
factor: 0.2, // randomization factor
multiplier: 2, // exponential factor
retries: 3, // max retries
},
})

Basic auth

It's also possible to configure basic auth:

const registry = new SchemaRegistry({
host: 'http://localhost:8081',
auth: {
username: '***',
password: '***',
},
})

HTTP Agent

Configuring the behavior of the HTTP requests towards the schema registry API can be done by passing in an instance of an Agent.

import { Agent } from 'http'

const agent = new Agent({ keepAlive: true })
const registry = new SchemaRegistry({
host: 'http://localhost:8081',
agent
})

Schema type options

The second argument to the SchemaRegistry constructor is an object with keys for each SchemaType.

Avro

The Avro schema type options are passed directly to avsc.Type.forSchema as the opts argument. For example:

import { SchemaRegistry, SchemaType } from '@kafkajs/confluent-schema-registry'

const options = {
[SchemaType.AVRO]: {
logicalTypes: { decimal: DecimalType }
}
}

const registry = new SchemaRegistry({ host: 'http://localhost:8081' }, options)

Protobuf

The only available option is messageName, which is used to select which message in a schema containing multiple messages to use for encoding/decoding the payload. If omitted, the first message type in the schema is used.

const options = {
[SchemaType.PROTOBUF]: {
messageName: 'CustomMessage'
}
}

JSON Schema

The JSON Schema schema type options are passed to the Ajv constructor. For example:

const options = {
[SchemaType.JSON]: {
strict: true
}
}

Alternatively, you can provide a custom Ajv instance using the ajvInstance option. This can be useful if you need to configure Ajv outside of what the constructor parameters allow.

const options = {
[SchemaType.JSON]: {
ajvInstance: new Ajv()
}
}