How one can Set Up Kafka Integration Check – Grape Up

Do you think about unit testing as not sufficient answer for protecting the appliance’s reliability and stability? Are you afraid that by some means or someplace there’s a potential bug hiding within the assumption that unit assessments ought to cowl all instances? And in addition is mocking Kafka not sufficient for venture necessities? If even one reply is  ‘sure’, then welcome to a pleasant and straightforward information on the right way to arrange Integration Exams for Kafka utilizing TestContainers and Embedded Kafka for Spring!

What’s TestContainers?

TestContainers is an open-source Java library specialised in offering all wanted options for the mixing and testing of exterior sources. It signifies that we’re capable of mimic an precise database, internet server, and even an occasion bus surroundings and deal with that as a dependable place to check app performance. All these fancy options are hooked into docker photos, outlined as containers. Do we have to take a look at the database layer with precise MongoDB? No worries, we have now a take a look at container for that. We can’t additionally overlook about UI assessments – Selenium Container will do something that we really need.
In our case, we’ll give attention to Kafka Testcontainer.

What’s Embedded Kafka?

Because the identify suggests, we’re going to take care of an in-memory Kafka occasion, prepared for use as a standard dealer with full performance. It permits us to work with producers and shoppers, as ordinary, making our integration assessments light-weight. 

Earlier than we begin

The idea for our take a look at is easy – I want to take a look at Kafka client and producer utilizing two totally different approaches and examine how we are able to make the most of them in precise instances. 

Kafka Messages are serialized utilizing Avro schemas.

Embedded Kafka – Producer Check

The idea is simple – let’s create a easy venture with the controller, which invokes a service technique to push a Kafka Avro serialized message.


implementation "org.apache.avro:avro:1.10.1"
implementation 'org.springframework.boot:spring-boot-starter-validation'
implementation 'org.springframework.kafka:spring-kafka'

implementation 'org.projectlombok:lombok:1.18.16'

compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'

Additionally price mentioning incredible plugin for Avro. Right here plugins part:

	id 'org.springframework.boot' model '2.6.8'
	id 'io.spring.dependency-management' model '1.0.11.RELEASE'
	id 'java'
	id "com.github.davidmc24.gradle.plugin.avro" model "1.3.0"

Avro Plugin helps schema auto-generating. This can be a must-have.

Hyperlink to plugin:

Now let’s outline the Avro schema:

  "namespace": "com.grapeup.myawesome.myawesomeproducer",
  "kind": "file",
  "identify": "RegisterRequest",
  "fields": [
    "name": "id", "type": "long",
    "name": "address", "type": "string", "": "String"


Our ProducerService will likely be centered solely on sending messages to Kafka utilizing a template, nothing thrilling about that half. Most important performance might be achieved simply utilizing this line:

ListenableFuture<SendResult<String, RegisterRequest>> future = this.kafkaTemplate.ship("register-request", kafkaMessage);

We will’t overlook about take a look at properties:

  most important:
    allow-bean-definition-overriding: true
      group-id: group_id
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.frequent.serialization.StringDeserializer
      value-deserializer: com.grapeup.myawesome.myawesomeconsumer.frequent.CustomKafkaAvroDeserializer
      auto.register.schemas: true
      key-serializer: org.apache.kafka.frequent.serialization.StringSerializer
      value-serializer: com.grapeup.myawesome.myawesomeconsumer.frequent.CustomKafkaAvroSerializer
      particular.avro.reader: true

As we see within the talked about take a look at properties, we declare a customized deserializer/serializer for KafkaMessages. It’s extremely really useful to make use of Kafka with Avro – don’t let JSONs preserve object construction, let’s use civilized mapper and object definition like Avro.


public class CustomKafkaAvroSerializer extends KafkaAvroSerializer 
    public CustomKafkaAvroSerializer() 
        tremendous.schemaRegistry = new MockSchemaRegistryClient();

    public CustomKafkaAvroSerializer(SchemaRegistryClient consumer) 
        tremendous(new MockSchemaRegistryClient());

    public CustomKafkaAvroSerializer(SchemaRegistryClient consumer, Map<String, ?> props) 
        tremendous(new MockSchemaRegistryClient(), props);


public class CustomKafkaAvroSerializer extends KafkaAvroSerializer 
    public CustomKafkaAvroSerializer() 
        tremendous.schemaRegistry = new MockSchemaRegistryClient();

    public CustomKafkaAvroSerializer(SchemaRegistryClient consumer) 
        tremendous(new MockSchemaRegistryClient());

    public CustomKafkaAvroSerializer(SchemaRegistryClient consumer, Map<String, ?> props) 
        tremendous(new MockSchemaRegistryClient(), props);

And we have now all the things to start out writing our take a look at.

@ActiveProfiles("take a look at")
@EmbeddedKafka(partitions = 1, subjects = "register-request")
class ProducerControllerTest {

All we have to do is add @EmbeddedKafka annotation with listed subjects and partitions. Software Context will boot Kafka Dealer with offered configuration similar to that. Needless to say @TestInstance must be used with particular consideration. Lifecycle.PER_CLASS will keep away from creating the identical objects/context for every take a look at technique. Value checking if assessments are too time-consuming.

Shopper<String, RegisterRequest> consumerServiceTest;
void setUp() 
DefaultKafkaConsumerFactory<String, RegisterRequest> client = new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties();

consumerServiceTest = client.createConsumer();

Right here we are able to declare the take a look at client, primarily based on the Avro schema return kind. All Kafka properties are already offered within the .yml file. That client will likely be used as a examine if the producer truly pushed a message.

Right here is the precise take a look at technique:

void whenValidInput_therReturns200() throws Exception 
        RegisterRequestDto request = RegisterRequestDto.builder()

        mockMvc.carry out(
                put up("/register-request")
                      .content material(objectMapper.writeValueAsBytes(request)))

      ConsumerRecord<String, RegisterRequest> consumedRegisterRequest =  KafkaTestUtils.getSingleRecord(consumerServiceTest, TOPIC_NAME);

        RegisterRequest valueReceived = consumedRegisterRequest.worth();

        assertEquals(12, valueReceived.getId());
        assertEquals("tempAddress", valueReceived.getAddress());

To start with, we use MockMvc to carry out an motion on our endpoint. That endpoint makes use of ProducerService to push messages to Kafka. KafkaConsumer is used to confirm if the producer labored as anticipated. And that’s it – we have now a totally working take a look at with embedded Kafka.

Check Containers – Shopper Check

TestContainers are nothing else like impartial docker photos prepared for being dockerized. The next take a look at situation will likely be enhanced by a MongoDB picture. Why not maintain our knowledge within the database proper after something occurred in Kafka circulation?

Dependencies aren’t a lot totally different than within the earlier instance. The next steps are wanted for take a look at containers:

testImplementation 'org.testcontainers:junit-jupiter'
	testImplementation 'org.testcontainers:kafka'
	testImplementation 'org.testcontainers:mongodb'

	set('testcontainersVersion', "1.17.1")

		mavenBom "org.testcontainers:testcontainers-bom:$testcontainersVersion"

Let’s focus now on the Shopper half. The take a look at case will likely be easy – one client service will likely be chargeable for getting the Kafka message and storing the parsed payload within the MongoDB assortment. All that we have to find out about KafkaListeners, for now, is that annotation:

@KafkaListener(subjects = "register-request")

By the performance of the annotation processor, KafkaListenerContainerFactory will likely be accountable to create a listener on our technique. From this second our technique will react to any upcoming Kafka message with the talked about matter.

Avro serializer and deserializer configs are the identical as within the earlier take a look at.

Concerning TestContainer, we must always begin with the next annotations:

@ActiveProfiles("take a look at")
public class AbstractIntegrationTest {

Throughout startup, all configured TestContainers modules will likely be activated. It means that we are going to get entry to the total working surroundings of the chosen supply. As instance:

personal KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

public static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));

static MongoDBContainer mongoDBContainer = new MongoDBContainer("mongo:4.4.2").withExposedPorts(27017);

On account of booting the take a look at, we are able to count on two docker containers to start out with the offered configuration.

What is de facto essential for the mongo container – it provides us full entry to the database utilizing only a easy connection uri. With such a characteristic, we’re ready to have a look what’s the present state in our collections, even throughout debug mode and ready breakpoints.
Have a look additionally on the Ryuk container – it really works like overwatch and checks if our containers have began appropriately.

And right here is the final a part of the configuration:

static void dataSourceProperties(DynamicPropertyRegistry registry) 
   registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);
   registry.add("spring.kafka.client.bootstrap-servers", kafkaContainer::getBootstrapServers);
   registry.add("spring.kafka.producer.bootstrap-servers", kafkaContainer::getBootstrapServers);
   registry.add("spring.knowledge.mongodb.uri", mongoDBContainer::getReplicaSetUrl);



public void beforeTest() 

           messageListenerContainer -> 
                       .waitForAssignment(messageListenerContainer, 1);


static void tearDown() 

DynamicPropertySource provides us the choice to set all wanted surroundings variables through the take a look at lifecycle. Strongly wanted for any config functions for TestContainers. Additionally, beforeTestClass kafkaListenerEndpointRegistry waits for every listener to get anticipated partitions throughout container startup.

And the final a part of the Kafka take a look at containers journey – the primary physique of the take a look at:

public void containerStartsAndPublicPortIsAvailable() throws Exception 
   writeToTopic("register-request", RegisterRequest.newBuilder().setId(123).setAddress("dummyAddress").construct());

   //Look ahead to KafkaListener
   Assertions.assertEquals(1, taxiRepository.findAll().measurement());

personal KafkaProducer<String, RegisterRequest> createProducer() 
   return new KafkaProducer<>(kafkaProperties.buildProducerProperties());

personal void writeToTopic(String topicName, RegisterRequest... registerRequests) 

   attempt (KafkaProducer<String, RegisterRequest> producer = createProducer())
               .forEach(registerRequest -> 
                           ProducerRecord<String, RegisterRequest> file = new ProducerRecord<>(topicName, registerRequest);

The customized producer is chargeable for writing our message to KafkaBroker. Additionally, it’s endorsed to provide a while for shoppers to deal with messages correctly. As we see, the message was not simply consumed by the listener, but in addition saved within the MongoDB assortment.


As we are able to see, present options for integration assessments are fairly straightforward to implement and preserve in tasks. There isn’t any level in protecting simply unit assessments and relying on all traces lined as an indication of code/logic high quality. Now the query is, ought to we use an Embedded answer or TestContainers? I recommend to begin with specializing in the phrase “Embedded”. As an ideal integration take a look at, we need to get an virtually best copy of the manufacturing surroundings with all properties/options included. In-memory options are good, however principally, not sufficient for giant enterprise tasks. Undoubtedly, the benefit of Embedded providers is the straightforward option to implement such assessments and preserve configuration, simply when something occurs in reminiscence.
TestContainers on the first sight may appear to be overkill, however they offer us a very powerful characteristic, which is a separate surroundings. We don’t need to even depend on current docker photos – if we wish we are able to use customized ones. This can be a enormous enchancment for potential take a look at situations.
What about Jenkins? There isn’t any cause to be afraid additionally to make use of TestContainers in Jenkins. I firmly advocate checking TestContainers documentation on how simply we are able to arrange the configuration for Jenkins brokers.
To sum up – if there is no such thing as a blocker or any undesirable situation for utilizing TestContainers, then don’t hesitate. It’s at all times good to maintain all providers managed and secured with integration take a look at contracts.