Skip to main content

Migrating from v1 to v2

In version 1, @kafkajs/confluent-schema-registry only supported Avro schemas and the API was very Avro specific. In version 2, support for JSON Schema and Protobuf was added, which necessitated some changes in the public api.

We have tried to make these changes as unintrusive as we can, by making the vast majority of changes backwards compatible. In fact, for Javascript users, the changes are completely backwards compatible for Avro schemas.

For Typescript users, however, or if you are migrating from using Avro to using one of the other schema types, there are a few things to keep in mind.

The main change is that the core Schema type has changed from an Avro specific schema to a generic schema interface for all schema types. The old schema looked like this:

interface AvroSchema {
name: string
namespace?: string
type: 'record'
fields: any[]
toBuffer(payload: object): Buffer
fromBuffer(buffer: Buffer, resolver?: Resolver, noCheck?: boolean): any
isValid(
payload: object,
opts?: { errorHook: (path: Array<string>, value: any, type?: any) => void },
): boolean
}

This is still the case for Avro schemas, but for Protobuf or Json Schema, a more generic schema type is used:

interface Schema {
toBuffer(payload: object): Buffer
fromBuffer(buffer: Buffer, resolver?: Resolver, noCheck?: boolean): any
isValid(
payload: object,
opts?: { errorHook: (path: Array<string>, value: any, type?: any) => void },
): boolean
}

If you have code that uses any of the Avro specific fields on the schema (for example returned by getSchema), you may need to first narrow the type to AvroSchema:

import { AvroSchema, Schema } from '@kafkajs/confluent-schema-registry'

function isAvroSchema(schema: AvroSchema | Schema): schema is AvroSchema {
return (schema as AvroSchema).name != null
}

const schema = await registry.getSchema(registryId)

if (isAvroSchema(schema)) {
// schema is now `AvroSchema`
const { name, namespace, type, fields } = schema
}

Adapting to new APIs

This is entirely optional, as all the old APIs should be retained (with the above caveat about the schema type), but if you want to adapt your code to the new API to make future migrations easier, these are the changes you would need to make.

Configuring serialization libraries

In version 1, the client constructor took a second options argument with a single option forSchemaOptions that was passed directly to avsc.Type.forSchema as the opts argument.

const { SchemaRegistry } = require('@kafkajs/confluent-schema-registry')

const registry = new SchemaRegistry(
{ host: 'http://localhost:8081' },
{ forSchemaOptions: { noAnonymousTypes: true }}
)

Since we now support multiple schema types, these options have been moved one level into a schema type specific option without the forSchemaOptions key:

const { SchemaRegistry, SchemaType } = require('@kafkajs/confluent-schema-registry')

const registry = new SchemaRegistry(
{ host: 'http://localhost:8081' },
{
[SchemaType.AVRO]: { noAnonymousTypes: true },

// This allows you to also pass options for Protobuf and JSON Schema
[SchemaType.JSON]: { strict: true }

[SchemaType.PROTOBUF]: { messageName: 'CustomMessage' }
}
)

See Schema Type Options for more information.

Registering schemas

In version 1, the schema type was implicitly Avro, so you would just pass in the schema from readAVSCAsync directly:

const schema = await readAVSCAsync('path/to/schema.avsc')
await registry.register(schema)

In version 2, there are two major changes:

  1. The type of the schema needs to be set to one of SchemaType.
  2. The schema itself is now a string, instead of an object.

We call this new type a ConfluentSchema

interface ConfluentSchema {
type: SchemaType
schema: string
}
const { SchemaType } = require('@kafkajs/confluent-schema-registry')

const schema = await readAVSCAsync('path/to/schema.avsc')
const schemaString = JSON.stringify(schema)
await registry.register({ type: SchemaType.AVRO, schema: schemaString })

Getting registry id by schema

Similar to register, getting the registry id by schema used to take an AvroSchema, and now takes a ConfluentSchema.

Version 1:

const schema = await readAVSCAsync('path/to/schema.avsc')
await registry.getRegistryIdBySchema('subject', schema)

Version 2:

const schema = await readAVSCAsync('path/to/schema.avsc')
const schemaString = JSON.stringify(schema)
await registry.getRegistryIdBySchema('subject', { type: SchemaType.AVRO, schema: schemaString })