Kafka
Install
| npm install @testcontainers/kafka --save-dev
|
Examples
Kafka 8.x
These examples use the following libraries:
Choose an image from the container registry and substitute IMAGE.
Produce/consume a message
| await using container = await new KafkaContainer(IMAGE).start();
await assertMessageProducedAndConsumed(container);
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 | export async function assertMessageProducedAndConsumed(
container: StartedKafkaContainer,
additionalKafkaConfig: Partial<KafkaJS.KafkaConfig> = {},
additionalGlobalConfig: Partial<GlobalConfig> = {}
) {
const brokers = [`${container.getHost()}:${container.getMappedPort(9093)}`];
const kafka = new KafkaJS.Kafka({
kafkaJS: {
logLevel: KafkaJS.logLevel.ERROR,
brokers,
...additionalKafkaConfig,
},
...additionalGlobalConfig,
});
const producer = kafka.producer();
await producer.connect();
const consumer = kafka.consumer({ kafkaJS: { groupId: "test-group", fromBeginning: true } });
await consumer.connect();
await producer.send({ topic: "test-topic", messages: [{ value: "test message" }] });
await consumer.subscribe({ topic: "test-topic" });
const consumedMessage = await new Promise((resolve) =>
consumer.run({
eachMessage: async ({ message }) => resolve(message.value?.toString()),
})
);
expect(consumedMessage).toBe("test message");
await consumer.disconnect();
await producer.disconnect();
}
|
With SSL
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 | const saslConfig: SaslSslListenerOptions = {
port: 9096,
sasl: {
mechanism: "SCRAM-SHA-512",
user: {
name: "app-user",
password: "userPassword",
},
},
keystore: {
content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.keystore.pfx")),
passphrase: "serverKeystorePassword",
},
truststore: {
content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.truststore.pfx")),
passphrase: "serverTruststorePassword",
},
};
await using container = await new KafkaContainer(IMAGE).withSaslSslListener(saslConfig).start();
await assertMessageProducedAndConsumed(
container,
{
brokers: [`${container.getHost()}:${container.getMappedPort(9096)}`],
ssl: true,
},
{
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.username": "app-user",
"sasl.password": "userPassword",
"security.protocol": "sasl_ssl",
"ssl.ca.location": path.resolve(certificatesDir, "kafka.client.truststore.pem"),
}
);
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 | export async function assertMessageProducedAndConsumed(
container: StartedKafkaContainer,
additionalKafkaConfig: Partial<KafkaJS.KafkaConfig> = {},
additionalGlobalConfig: Partial<GlobalConfig> = {}
) {
const brokers = [`${container.getHost()}:${container.getMappedPort(9093)}`];
const kafka = new KafkaJS.Kafka({
kafkaJS: {
logLevel: KafkaJS.logLevel.ERROR,
brokers,
...additionalKafkaConfig,
},
...additionalGlobalConfig,
});
const producer = kafka.producer();
await producer.connect();
const consumer = kafka.consumer({ kafkaJS: { groupId: "test-group", fromBeginning: true } });
await consumer.connect();
await producer.send({ topic: "test-topic", messages: [{ value: "test message" }] });
await consumer.subscribe({ topic: "test-topic" });
const consumedMessage = await new Promise((resolve) =>
consumer.run({
eachMessage: async ({ message }) => resolve(message.value?.toString()),
})
);
expect(consumedMessage).toBe("test message");
await consumer.disconnect();
await producer.disconnect();
}
|
Kafka 7.x
These examples use the following libraries:
Choose an image from the container registry and substitute IMAGE.
Produce/consume a message
| await using container = await new KafkaContainer(IMAGE).start();
await assertMessageProducedAndConsumed(container);
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 | export async function assertMessageProducedAndConsumed(
container: StartedKafkaContainer,
additionalKafkaConfig: Partial<KafkaJS.KafkaConfig> = {},
additionalGlobalConfig: Partial<GlobalConfig> = {}
) {
const brokers = [`${container.getHost()}:${container.getMappedPort(9093)}`];
const kafka = new KafkaJS.Kafka({
kafkaJS: {
logLevel: KafkaJS.logLevel.ERROR,
brokers,
...additionalKafkaConfig,
},
...additionalGlobalConfig,
});
const producer = kafka.producer();
await producer.connect();
const consumer = kafka.consumer({ kafkaJS: { groupId: "test-group", fromBeginning: true } });
await consumer.connect();
await producer.send({ topic: "test-topic", messages: [{ value: "test message" }] });
await consumer.subscribe({ topic: "test-topic" });
const consumedMessage = await new Promise((resolve) =>
consumer.run({
eachMessage: async ({ message }) => resolve(message.value?.toString()),
})
);
expect(consumedMessage).toBe("test message");
await consumer.disconnect();
await producer.disconnect();
}
|
With SSL
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 | await using container = await new KafkaContainer("confluentinc/cp-kafka:7.5.0")
.withSaslSslListener({
port: 9096,
sasl: {
mechanism: "SCRAM-SHA-512",
user: {
name: "app-user",
password: "userPassword",
},
},
keystore: {
content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.keystore.pfx")),
passphrase: "serverKeystorePassword",
},
truststore: {
content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.truststore.pfx")),
passphrase: "serverTruststorePassword",
},
})
.start();
await assertMessageProducedAndConsumed(
container,
{
brokers: [`${container.getHost()}:${container.getMappedPort(9096)}`],
ssl: true,
},
{
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.username": "app-user",
"sasl.password": "userPassword",
"security.protocol": "sasl_ssl",
"ssl.ca.location": path.resolve(certificatesDir, "kafka.client.truststore.pem"),
}
);
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 | export async function assertMessageProducedAndConsumed(
container: StartedKafkaContainer,
additionalKafkaConfig: Partial<KafkaJS.KafkaConfig> = {},
additionalGlobalConfig: Partial<GlobalConfig> = {}
) {
const brokers = [`${container.getHost()}:${container.getMappedPort(9093)}`];
const kafka = new KafkaJS.Kafka({
kafkaJS: {
logLevel: KafkaJS.logLevel.ERROR,
brokers,
...additionalKafkaConfig,
},
...additionalGlobalConfig,
});
const producer = kafka.producer();
await producer.connect();
const consumer = kafka.consumer({ kafkaJS: { groupId: "test-group", fromBeginning: true } });
await consumer.connect();
await producer.send({ topic: "test-topic", messages: [{ value: "test message" }] });
await consumer.subscribe({ topic: "test-topic" });
const consumedMessage = await new Promise((resolve) =>
consumer.run({
eachMessage: async ({ message }) => resolve(message.value?.toString()),
})
);
expect(consumedMessage).toBe("test message");
await consumer.disconnect();
await producer.disconnect();
}
|
With provided ZooKeeper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 | await using network = await new Network().start();
const zooKeeperHost = "zookeeper";
const zooKeeperPort = 2181;
await using _ = await new GenericContainer("confluentinc/cp-zookeeper:5.5.4")
.withNetwork(network)
.withNetworkAliases(zooKeeperHost)
.withEnvironment({ ZOOKEEPER_CLIENT_PORT: zooKeeperPort.toString() })
.withExposedPorts(zooKeeperPort)
.start();
await using container = await new KafkaContainer(IMAGE)
.withNetwork(network)
.withZooKeeper(zooKeeperHost, zooKeeperPort)
.start();
|
With Kraft
| await using container = await new KafkaContainer(IMAGE).withKraft().start();
|