Migrating from v1 to v2
In version 1, @kafkajs/confluent-schema-registry
only supported Avro schemas and
the API was very Avro specific. In version 2, support for JSON Schema and Protobuf
was added, which necessitated some changes in the public api.
We have tried to make these changes as unintrusive as we can, by making the vast majority of changes backwards compatible. In fact, for Javascript users, the changes are completely backwards compatible for Avro schemas.
For Typescript users, however, or if you are migrating from using Avro to using one of the other schema types, there are a few things to keep in mind.
The main change is that the core Schema
type has changed from an Avro specific
schema to a generic schema interface for all schema types. The old schema looked
like this:
interface AvroSchema {
name: string
namespace?: string
type: 'record'
fields: any[]
toBuffer(payload: object): Buffer
fromBuffer(buffer: Buffer, resolver?: Resolver, noCheck?: boolean): any
isValid(
payload: object,
opts?: { errorHook: (path: Array<string>, value: any, type?: any) => void },
): boolean
}
This is still the case for Avro schemas, but for Protobuf or Json Schema, a more generic schema type is used:
interface Schema {
toBuffer(payload: object): Buffer
fromBuffer(buffer: Buffer, resolver?: Resolver, noCheck?: boolean): any
isValid(
payload: object,
opts?: { errorHook: (path: Array<string>, value: any, type?: any) => void },
): boolean
}
If you have code that uses any of the Avro specific fields on the schema
(for example returned by getSchema
), you may need to first narrow the type
to AvroSchema
:
import { AvroSchema, Schema } from '@kafkajs/confluent-schema-registry'
function isAvroSchema(schema: AvroSchema | Schema): schema is AvroSchema {
return (schema as AvroSchema).name != null
}
const schema = await registry.getSchema(registryId)
if (isAvroSchema(schema)) {
// schema is now `AvroSchema`
const { name, namespace, type, fields } = schema
}
Adapting to new APIs
This is entirely optional, as all the old APIs should be retained (with the above caveat about the schema type), but if you want to adapt your code to the new API to make future migrations easier, these are the changes you would need to make.
Configuring serialization libraries
In version 1, the client constructor took a second options
argument with
a single option forSchemaOptions
that was passed directly to
avsc.Type.forSchema
as the opts
argument.
const { SchemaRegistry } = require('@kafkajs/confluent-schema-registry')
const registry = new SchemaRegistry(
{ host: 'http://localhost:8081' },
{ forSchemaOptions: { noAnonymousTypes: true }}
)
Since we now support multiple schema types, these options have been moved
one level into a schema type specific option without the forSchemaOptions
key:
const { SchemaRegistry, SchemaType } = require('@kafkajs/confluent-schema-registry')
const registry = new SchemaRegistry(
{ host: 'http://localhost:8081' },
{
[SchemaType.AVRO]: { noAnonymousTypes: true },
// This allows you to also pass options for Protobuf and JSON Schema
[SchemaType.JSON]: { strict: true }
[SchemaType.PROTOBUF]: { messageName: 'CustomMessage' }
}
)
See Schema Type Options for more information.
Registering schemas
In version 1, the schema type was implicitly Avro, so you would just pass in the
schema from readAVSCAsync
directly:
const schema = await readAVSCAsync('path/to/schema.avsc')
await registry.register(schema)
In version 2, there are two major changes:
- The
type
of the schema needs to be set to one ofSchemaType
. - The
schema
itself is now a string, instead of an object.
We call this new type a ConfluentSchema
interface ConfluentSchema {
type: SchemaType
schema: string
}
const { SchemaType } = require('@kafkajs/confluent-schema-registry')
const schema = await readAVSCAsync('path/to/schema.avsc')
const schemaString = JSON.stringify(schema)
await registry.register({ type: SchemaType.AVRO, schema: schemaString })
Getting registry id by schema
Similar to register
, getting the registry id by schema used to take an
AvroSchema
, and now takes a ConfluentSchema
.
Version 1:
const schema = await readAVSCAsync('path/to/schema.avsc')
await registry.getRegistryIdBySchema('subject', schema)
Version 2:
const schema = await readAVSCAsync('path/to/schema.avsc')
const schemaString = JSON.stringify(schema)
await registry.getRegistryIdBySchema('subject', { type: SchemaType.AVRO, schema: schemaString })