Usage
Typical usage consists of uploading one or more schemas to the registry, encoding data using the registered schemas, and/or decoding encoded data by getting the schemas from the registry.
Creating the registry client
const { SchemaRegistry } = require('@kafkajs/confluent-schema-registry')
const registry = new SchemaRegistry({ host: 'http://localhost:8081' })
For more configuration options, see configuration.
Uploading schemas
The schemas can be registered with the schema registry using
registry.register({ type: SchemaType, schema: string })
, which resolves to an object containing the
schema id. This schema id is later used when encoding.
const schema = {
type: SchemaType.AVRO | SchemaType.JSON | SchemaType.PROTOBUF,
schema: "string"
}
const options = {
subject: "string"
}
await registry.register(schema, options)
Avro
const schema = `
{
"type": "record",
"name": "RandomTest",
"namespace": "examples",
"fields": [{ "type": "string", "name": "fullName" }]
}
`
const { id } = await registry.register({ type: SchemaType.AVRO, schema })
To simplify working with Avro schemas and integrating with existing tooling,
some utility functions are available. Schemas can be defined in either AVSC
or AVDL
format, and are read using readAVSCAsync
and avdlToAVSCAsync
respectively.
Note that these functions return objects rather than strings, but they can
be passed directly to register
as the schema
argument and will be
stringified internally.
const { SchemaType, readAVSCAsync, avdlToAVSCAsync } = require('@kafkajs/confluent-schema-registry')
// From an avsc file
const schema = await readAVSCAsync('path/to/schema.avsc')
const { id } = await registry.register({ type: SchemaType.AVRO, schema }) // { id: 2 }
// From an avdl file
const schema = await avdlToAVSCAsync('path/to/protocol.avdl')
const { id } = await registry.register({ type: SchemaType.AVRO, schema }) // { id: 3 }
Subject
For Avro schemas, the subject is automatically inferred from the schema if
options.subject
is not set.
See Subjects for more information on subjects
JSON Schema
const { SchemaType } = require('@kafkajs/confluent-schema-registry')
const schema = `
{
"definitions" : {
"record:examples.Person" : {
"type" : "object",
"required" : [ "fullName" ],
"additionalProperties" : false,
"properties" : {
"fullName" : {
"type" : "string"
}
}
}
},
"$ref" : "#/definitions/record:examples.Person"
}
`
const { id } = await registry.register({ type: SchemaType.JSON, schema })
Protobuf
const { SchemaType } = require('@kafkajs/confluent-schema-registry')
const schema = `
package examples;
message RandomTest {
required string fullName = 1;
}
`
const { id } = await registry.register({ type: SchemaType.PROTOBUF, schema })
Compatibility
The compatibility of the schema will be whatever the global default is (typically BACKWARD
).
It's possible to override this for the specific subject by setting it like so:
const {
COMPATIBILITY: { NONE },
} = require('@kafkajs/confluent-schema-registry')
await registry.register(schema, { compatibility: NONE })
NOTE:
If the subject already has an overridden compatibility setting and it's different,
the client will throw and error (ConfluentSchemaRegistryCompatibilityError
)
Subjects
Each schema is registered under a subject.
In Avro, this subject is generated by concatenating the schema namespace and the schema name
with a separator. For example, the following schema would get the subject com.example.Simple
:
@namespace("com.example")
protocol SimpleProto {
record Simple {
string foo;
}
}
registry.register
accepts a subject
option to override the subject entirely:
await registry.register(schema, { subject: 'my-fixed-subject' })
If you just want to change the separator used when automatically creating the subject, use
the separator
option:
// This would result in "com.example-Simple"
await registry.register(schema, { separator: '-' })
Other schema types
For non-Avro schema types, subject
is required and the method will throw if not provided.
Encoding data
To encode data, call registry.encode
with the schema id and the payload to encode.
const payload = { full_name: 'John Doe' }
await registry.encode(id, payload)
Decoding data
The encoded payload contains the schema id of the schema used to decode it,
so to decode, simply call registry.decode
with the encoded payload. The
corresponding schema will be downloaded from the registry if needed in order
to decode the payload.
const payload = await registry.decode(buffer)
// { full_name: 'John Doe' }
registry.decode
has an optional second options
argument with options
specific to each schema type.
Avro
With Avro you can specify a specific reader schema to use to decode the message, rather than using the schema registered in the registry. This can be useful if you need a projection that is different from the writer schema, or if you want to decode a message with a different version than was used to encode the message.
import avro from 'avsc'
import { readAVSCAsync } from '@kafkajs/confluent-schema-registry'
const readerSchema = await readAVSCAsync('path/to/protocol.avdl')
const payload = await registry.decode(buffer, {
[SchemaType.AVRO]: { readerSchema }
})
Configuration
Retry
By default, all GET
requests will retry three times in case of failure. If you want to tweak this config you can do:
const registry = new SchemaRegistry({
host: 'http://localhost:8081',
retry: {
maxRetryTimeInSecs: 5,
initialRetryTimeInSecs: 0.1,
factor: 0.2, // randomization factor
multiplier: 2, // exponential factor
retries: 3, // max retries
},
})
Basic auth
It's also possible to configure basic auth:
const registry = new SchemaRegistry({
host: 'http://localhost:8081',
auth: {
username: '***',
password: '***',
},
})
HTTP Agent
Configuring the behavior of the HTTP requests towards the schema registry API can be done by passing in an instance of an Agent.
import { Agent } from 'http'
const agent = new Agent({ keepAlive: true })
const registry = new SchemaRegistry({
host: 'http://localhost:8081',
agent
})
Schema type options
The second argument to the SchemaRegistry
constructor is an object with keys for each SchemaType
.
Avro
The Avro schema type options are passed directly to
avsc.Type.forSchema
as the opts
argument.
For example:
import { SchemaRegistry, SchemaType } from '@kafkajs/confluent-schema-registry'
const options = {
[SchemaType.AVRO]: {
logicalTypes: { decimal: DecimalType }
}
}
const registry = new SchemaRegistry({ host: 'http://localhost:8081' }, options)
Protobuf
The only available option is messageName
, which is used to select which message
in a schema containing multiple messages to use for encoding/decoding the payload.
If omitted, the first message type in the schema is used.
const options = {
[SchemaType.PROTOBUF]: {
messageName: 'CustomMessage'
}
}
JSON Schema
The JSON Schema schema type options are passed to the Ajv constructor. For example:
const options = {
[SchemaType.JSON]: {
strict: true
}
}
Alternatively, you can provide a custom Ajv instance using the ajvInstance
option. This can be useful if you
need to configure Ajv outside of what the constructor parameters allow.
const options = {
[SchemaType.JSON]: {
ajvInstance: new Ajv()
}
}