Connect to Aiven for Apache Kafka® with Java
You can connect to an Aiven for Apache Kafka® service using the Java client library for Apache Kafka.
The examples below show different authentication options: SSL, SASL-SSL, schema registry-based authentication. For more details, see Authentication types.
Prerequisites
Before you begin:
- 
Add the following dependencies to your Java project using your preferred artifact repository, such as Maven Central: - kafka-clients: Provides the core Kafka client library.
- kafka-avro-serializer: Enables Avro serialization and integration with Schema Registry.
 
- 
In the Aiven Console, go to the Overview page of your Aiven for Apache Kafka service and choose your authentication method: SSL authentication - In the Connection information section:
- Set Authentication method to Client certificate (if available).
- Download:
- service.key(Access key)
- service.cert(Access certificate)
- ca.pem(CA certificate)
 
 
- Create the keystore and truststore:
- client.keystore.p12
- client.truststore.jks
 
 SASL authentication - Enable SASL for your Kafka service.
- In the Connection information section:
- Set Authentication method to SASL.
- Download ca.pem(CA certificate).
- Note the Password for SASL authentication.
 
 
- In the Connection information section:
The examples below show only the filenames for the keystore and truststore files. In production environments, use the full file paths.
Variables
| Variable | Description | 
|---|---|
| HOST | Hostname of your Kafka service | 
| USER_NAME | Username for SASL or Schema Registry authentication | 
| SSL_PORT | SSL port for your Kafka service | 
| SASL_PORT | SASL port for your Kafka service | 
| SASL_PASSWORD | Password for SASL authentication | 
| KEYSTORE_LOCATION | Path to your client.keystore.p12file | 
| TRUSTSTORE_LOCATION | Path to your client.truststore.jksfile | 
| KEYSTORE_PASSWORD | Password for the keystore | 
| KEY_PASSWORD | Password for the private key (if different from the keystore password) | 
| TRUSTSTORE_PASSWORD | Password for the truststore | 
| SCHEMA_REGISTRY_URL | URL of the Schema Registry | 
| SCHEMA_REGISTRY_USER | Username for Schema Registry authentication | 
| SCHEMA_REGISTRY_PASS | Password for Schema Registry authentication | 
Connect a producer
Set up properties to connect to the Kafka cluster and create a producer.
With SSL authentication
Properties properties = new Properties();
properties.put("bootstrap.servers", "HOST:SSL_PORT");
properties.put("security.protocol", "SSL");
properties.put("ssl.keystore.type", "PKCS12");
properties.put("ssl.keystore.location", "KEYSTORE_LOCATION");
properties.put("ssl.keystore.password", "KEYSTORE_PASSWORD");
properties.put("ssl.key.password", "KEY_PASSWORD");
properties.put("ssl.truststore.type", "JKS");
properties.put("ssl.truststore.location", "TRUSTSTORE_LOCATION");
properties.put("ssl.truststore.password", "TRUSTSTORE_PASSWORD");
// Create a producer with StringSerializer for key and value
KafkaProducer<String, String> producer = new KafkaProducer<>(properties, new StringSerializer(), new StringSerializer());
With SASL authentication
String sasl_username = "USER_NAME";
String sasl_password = "SASL_PASSWORD";
String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
String jaasConfig = String.format(jaasTemplate, sasl_username, sasl_password);
Properties properties = new Properties();
properties.put("bootstrap.servers", "HOST:SASL_PORT");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "SCRAM-SHA-256");
properties.put("sasl.jaas.config", jaasConfig);
properties.put("ssl.truststore.type", "JKS");
properties.put("ssl.truststore.location", "TRUSTSTORE_LOCATION");
properties.put("ssl.truststore.password", "TRUSTSTORE_PASSWORD");
// Create a producer with StringSerializer for key and value
KafkaProducer<String, String> producer = new KafkaProducer<>(properties, new StringSerializer(), new StringSerializer());
With Schema Registry (Karapace)
Properties properties = new Properties();
properties.put("bootstrap.servers", "HOST:SSL_PORT");
properties.put("security.protocol", "SSL");
properties.put("ssl.keystore.type", "PKCS12");
properties.put("ssl.keystore.location", "KEYSTORE_LOCATION");
properties.put("ssl.keystore.password", "KEYSTORE_PASSWORD");
properties.put("ssl.key.password", "KEY_PASSWORD");
properties.put("ssl.truststore.type", "JKS");
properties.put("ssl.truststore.location", "TRUSTSTORE_LOCATION");
properties.put("ssl.truststore.password", "TRUSTSTORE_PASSWORD");
// Configure Schema Registry-specific properties
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
properties.setProperty("basic.auth.credentials.source", "USER_INFO");
properties.setProperty("schema.registry.basic.auth.user.info", "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASS");
properties.setProperty("auto.register.schemas", "false");
properties.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "SCHEMA_REGISTRY_URL");
// Create a producer with StringSerializer for key and AvroSerializer for value
KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(properties);
// Define schema
String valueSchemaString = "\"type\": \"record\", \"name\": \"simple\"," +
    "\"namespace\": \"example.avro\", " +
    "\"fields\": [\"name\": \"name\", \"type\": \"string\"]";
Schema avroValueSchema = new Schema.Parser().parse(valueSchemaString);
// Produce a record
GenericRecord thisValueRecord = new GenericData.Record(avroValueSchema);
thisValueRecord.put("name", "Alice");
producer.send(new ProducerRecord<>("your-topic", "key", thisValueRecord));
Connect a consumer
Set up properties to connect to the Kafka cluster and create a consumer.
With SSL authentication
String group_id = "groupid";
Properties properties = new Properties();
properties.put("bootstrap.servers", "HOST:SSL_PORT");
properties.put("security.protocol", "SSL");
properties.put("ssl.keystore.type", "PKCS12");
properties.put("ssl.keystore.location", "KEYSTORE_LOCATION");
properties.put("ssl.keystore.password", "KEYSTORE_PASSWORD");
properties.put("ssl.key.password", "KEY_PASSWORD");
properties.put("ssl.truststore.type", "JKS");
properties.put("ssl.truststore.location", "TRUSTSTORE_LOCATION");
properties.put("ssl.truststore.password", "TRUSTSTORE_PASSWORD");
properties.put("group.id", group_id);
// Create a consumer with StringDeserializer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties, new StringDeserializer(), new StringDeserializer());
With SASL authentication
String group_id = "groupid";
String sasl_username = "USER_NAME";
String sasl_password = "SASL_PASSWORD";
String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
String jaasConfig = String.format(jaasTemplate, sasl_username, sasl_password);
Properties properties = new Properties();
properties.put("bootstrap.servers", "HOST:SASL_PORT");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "SCRAM-SHA-256");
properties.put("sasl.jaas.config", jaasConfig);
properties.put("ssl.truststore.type", "JKS");
properties.put("ssl.truststore.location", "TRUSTSTORE_LOCATION");
properties.put("ssl.truststore.password", "TRUSTSTORE_PASSWORD");
properties.put("group.id", group_id);
// Create a consumer with StringDeserializer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties, new StringDeserializer(), new StringDeserializer());
With Schema Registry (Karapace)
String group_id = "groupid";
Properties properties = new Properties();
properties.put("bootstrap.servers", "HOST:SSL_PORT");
properties.put("security.protocol", "SSL");
properties.put("ssl.keystore.type", "PKCS12");
properties.put("ssl.keystore.location", "KEYSTORE_LOCATION");
properties.put("ssl.keystore.password", "KEYSTORE_PASSWORD");
properties.put("ssl.key.password", "KEY_PASSWORD");
properties.put("ssl.truststore.type", "JKS");
properties.put("ssl.truststore.location", "TRUSTSTORE_LOCATION");
properties.put("ssl.truststore.password", "TRUSTSTORE_PASSWORD");
properties.put("group.id", group_id);
// Configure Schema Registry-specific properties
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
properties.setProperty("basic.auth.credentials.source", "USER_INFO");
properties.setProperty("schema.registry.basic.auth.user.info", "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASS");
properties.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "SCHEMA_REGISTRY_URL");
// Create a consumer with StringDeserializer for key and AvroDeserializer for value
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(properties);
// Process records
ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(100));