Kube Cloud Pt5 | Create an Event Publisher

Kube Cloud Pt5 | Create an Event Publisher

Kube Cloud Pt5 | Streaming Events with Kafka

full course
  1. Kube Cloud Pt5 | Create a Consumer Service
  2. Kube Cloud Pt5 | Create an Event Publisher
  3. Kube Cloud Pt5 | Create an Event Consumer

We’re going to be updating cloud-application to remove the mongodb repository logic and replace it with an event publisher.

Create a new branch from main

git checkout -b migrate

Update the pom.xml first, remove the mongodb dependencies

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-mongodb</artifactId>
        </dependency>
...
        <dependency>
            <groupId>org.testcontainers</groupId>
            <artifactId>mongodb</artifactId>
            <version>1.16.3</version>
            <scope>test</scope>
        </dependency>

and add the spring cloud streaming dependencies and the kafka binder

        <!-- Streams and Kafka -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
            <type>test-jar</type>
            <classifier>test-binder</classifier>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>

What we’re doing is adding the streaming functionality and support test frameworks, but also adding the platform specific binding classes needed for kafka. Streaming is generic and will work across multiple platforms (like kafka and rabbitmq). However, in order to actually work against confluent, we need the kafka adapters (the binding classes).

Next, I globally refactored my Message class to be called MessageModel so that I wouldn’t conflict with the Message classes that spring cloud stream provides. I’ll be using that class name going forward.

Next, make a new MessageEvent object under com.bullyrooks.cloud_application.messaging.dto

package com.bullyrooks.cloud_application.messaging.dto;

import lombok.Data;

@Data
public class MessageEvent {
    private String messageId;
    private String firstName;
    private String lastName;
    private String message;
}

This is going to be the dto used to transfer the event inside the message (as a json payload)

we also need a mapper. Create MessageEventMapper in com.bullyrooks.cloud_application.messaging.mapper

package com.bullyrooks.cloud_application.messaging.mapper;

import com.bullyrooks.cloud_application.messaging.dto.MessageEvent;
import com.bullyrooks.cloud_application.service.model.MessageModel;
import org.mapstruct.Mapper;
import org.mapstruct.factory.Mappers;

@Mapper
public interface MessageEventMapper {
    MessageEventMapper INSTANCE = Mappers.getMapper(MessageEventMapper.class);

    MessageEvent modelToEvent(MessageModel model);

    MessageModel eventToModel(MessageEvent returnEvent);
}

Next, lets delete the packages and classes in com.bullyrooks.cloud_application_repository (or save them for later since we’re going to move them to the new service)

Next, we’re going to update the service class (MessageService) to publish an event instead of storing in the repository


...
    //new attribute to inject
    StreamBridge streamBridge;
...
    @Autowired
    public MessageService(MessageGeneratorClient messageGeneratorClient,
                          StreamBridge streamBridge,
                          MeterRegistry logzioMeterRegistry){
        this.messageGeneratorClient = messageGeneratorClient;
        this.streamBridge = streamBridge;
        this.logzioMeterRegistry = logzioMeterRegistry;
        initCounters();
    }
...
    public MessageModel saveMessage(MessageModel messageModel) {

        msgCount.increment();
        if (StringUtils.isEmpty(messageModel.getMessage())) {

            genMsgCount.increment();
            log.info("No message, retrieve from message generator");
            MessageResponseDTO dto = messageGeneratorClient.getMessage();
            messageModel.setMessage(dto.getMessage());
            genMsgSuccess.increment();
            log.info("retrieved message: {}", messageModel.getMessage());
        }

        log.info("publishing event: {}", messageModel);
        streamBridge.send("message.created",
                MessageEventMapper.INSTANCE.modelToEvent(messageModel));
        return messageModel;
    }

Now we have to add the configuration so that our topics and binders can connect to confluent.

spring:
  application:
    name: cloud-application
  cloud:
    stream:
      function:
        bindings:
          messageEvent-out-0:
            destination: message.created
  kafka:
    properties:
      sasl.mechanism: PLAIN
      bootstrap.servers: pkc-2396y.us-east-1.aws.confluent.cloud:9092
      security.protocol: SASL_SSL

we’re missing a key piece here which is the security credentials, so when you run make sure to add a -D property to configure them:

And the correct credentials can be found on confluent. There’s an option available when you create the cluster, I think. But they can be recreated this way:

From cluster overview, choose configure a client

Doesn’t matter which client, but we can pick java and you should get a screen like this:

If you pick create kafka cluster api key, you should be given the option to get a new api key:

You can copy/past the key and secret into your configuration from here. Make sure to have the leading/training " (double quote) and surround the key/secret with the ' (single quote) as these will help escape any control characters.

Now we need to update the configuration so that our tests will work, add this to the application-test.yaml

spring:
  cloud:
    stream:
      bindings:
        messageEvent-out-0:
          destination: message.created 

and next the actual test. Add this annotation to allow spring to inject a test binder.

@Import(TestChannelBinderConfiguration.class)

Add this field so that we can access the test binder topic

    @Autowired
    OutputDestination outputDestination;

remove the repository injection and asserts and replace with the topic assertions. This is what the final test should look like:

package com.bullyrooks.cloud_application.controller;

import com.bullyrooks.cloud_application.controller.dto.CreateMessageRequestDTO;
import com.bullyrooks.cloud_application.controller.dto.CreateMessageResponseDTO;
import com.bullyrooks.cloud_application.message_generator.client.MessageGeneratorClient;
import com.bullyrooks.cloud_application.message_generator.client.dto.MessageResponseDTO;
import com.bullyrooks.cloud_application.messaging.dto.MessageEvent;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.javafaker.Faker;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.annotation.Import;
import org.springframework.http.ResponseEntity;
import org.springframework.messaging.Message;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.util.UriComponents;
import org.springframework.web.util.UriComponentsBuilder;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;

@ExtendWith(SpringExtension.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Slf4j
@AutoConfigureMockMvc
@Import(TestChannelBinderConfiguration.class)
public class MessageControllerTest {

    private final String MESSAGE_PATH = "/message";

    @LocalServerPort
    int randomServerPort;

    @MockBean
    MessageGeneratorClient messageGeneratorClient;

    @Autowired
    OutputDestination outputDestination;

    Faker faker = new Faker();

    @Test
    void testSaveMessage() throws IOException {
        Long userId = 1l;

        //given
        CreateMessageRequestDTO request = CreateMessageRequestDTO
                .builder()
                .firstName(faker.name().firstName())
                .lastName(faker.name().lastName())
                .message(faker.gameOfThrones().quote())
                .build();

        //when
        RestTemplate restTemplate = new RestTemplate();
        String baseUrl = "http://localhost:" + randomServerPort + MESSAGE_PATH;
        UriComponents builder = UriComponentsBuilder.fromHttpUrl(baseUrl)
                .build();

        ResponseEntity<CreateMessageResponseDTO> result = restTemplate.postForEntity(
                builder.toUri(), request, CreateMessageResponseDTO.class);

        //then
        CreateMessageResponseDTO dto = result.getBody();
        assertEquals(request.getFirstName(), dto.getFirstName());
        assertEquals(request.getLastName(), dto.getLastName());
        assertEquals(request.getMessage(), dto.getMessage());

        Message<byte[]> receievedMessage = outputDestination.receive(1000,"message.created");
        String messageStr = new String(receievedMessage.getPayload(), StandardCharsets.UTF_8);
        log.info("message string: {}", messageStr);
        ObjectMapper mapper = new ObjectMapper();
        MessageEvent event = mapper.reader().readValue(messageStr, MessageEvent.class);
        assertEquals(request.getFirstName(), event.getFirstName());
        assertEquals(request.getLastName(), event.getLastName());
        assertEquals(request.getMessage(), event.getMessage());

    }
    @Test
    void testGetReturnsMessageIfMissing() throws InterruptedException, IOException {
        Long userId = 1l;

        //given
        CreateMessageRequestDTO request = CreateMessageRequestDTO
                .builder()
                .firstName(faker.name().firstName())
                .lastName(faker.name().lastName())
                .build();

        when(messageGeneratorClient.getMessage()).thenReturn(
                MessageResponseDTO.builder()
                .message(faker.gameOfThrones().quote())
                .build());

        //when
        RestTemplate restTemplate = new RestTemplate();
        String baseUrl = "http://localhost:" + randomServerPort + MESSAGE_PATH;
        UriComponents builder = UriComponentsBuilder.fromHttpUrl(baseUrl)
                .build();

        ResponseEntity<CreateMessageResponseDTO> result = restTemplate.postForEntity(
                builder.toUri(), request, CreateMessageResponseDTO.class);

        //then
        CreateMessageResponseDTO dto = result.getBody();
        assertEquals(request.getFirstName(), dto.getFirstName());
        assertEquals(request.getLastName(), dto.getLastName());
        assertTrue(StringUtils.isNotBlank(dto.getMessage()));

        Message<byte[]> receievedMessage = outputDestination.receive(1000,"message.created");
        String messageStr = new String(receievedMessage.getPayload(), StandardCharsets.UTF_8);
        log.info("message string: {}", messageStr);
        ObjectMapper mapper = new ObjectMapper();
        MessageEvent event = mapper.reader().readValue(messageStr, MessageEvent.class);
        assertEquals(request.getFirstName(), event.getFirstName());
        assertEquals(request.getLastName(), event.getLastName());
        assertTrue(StringUtils.isNotBlank(event.getMessage()));
    }

}
utDestination.receive(1000,"message.created");
        String messageStr = new String(receievedMessage.getPayload(), StandardCharsets.UTF_8);
        log.info("message string: {}", messageStr);
        ObjectMapper mapper = new ObjectMapper();
        MessageEvent event = mapper.reader().readValue(messageStr, MessageEvent.class);
        assertEquals(request.getFirstName(), event.getFirstName());
        assertEquals(request.getLastName(), event.getLastName());
        assertTrue(StringUtils.isNotBlank(event.getMessage()));
    }

}

You can see that we’re pulling a message off the topic, decoding it into a json message which we transform into an object before we run our asserts.

The final thing we’ll need to do is create a new secret and environment variable in our helm deployment, so that we can connect to confluent when we deploy to okteto

kubectl create secret generic confluent-secrets --from-literal=SPRING_KAFKA_PROPERTIES_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule   required username='...'  password='...';"

and add this environment configuration to the deployment.yaml file

          - name: SPRING_KAFKA_PROPERTIES_SASL_JAAS_CONFIG
            valueFrom:
              secretKeyRef:
                name: confluent-secrets
                key: SPRING_KAFKA_PROPERTIES_SASL_JAAS_CONFIG

Testing

We should be able to confirm that we can publish messages now. Navigate to the topic message page in confluent

Start up the application locally and hit the cloud-application endpoint with postman

You should see a message appear in the topic in confluent

push up to the feature branch and if it builds successfully merge to main

Run the test again against the cloud instance and confirm you can still see the message being produced

0 comments on “Kube Cloud Pt5 | Create an Event PublisherAdd yours →

Leave a Reply

Your email address will not be published. Required fields are marked *