Kube Cloud Pt5 | Streaming Events with Kafka
full course- Kube Cloud Pt5 | Create a Consumer Service
- Kube Cloud Pt5 | Create an Event Publisher
- Kube Cloud Pt5 | Create an Event Consumer
We’re going to be updating cloud-application to remove the mongodb repository logic and replace it with an event publisher.
Create a new branch from main
git checkout -b migrate
Update the pom.xml first, remove the mongodb dependencies
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
...
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mongodb</artifactId>
<version>1.16.3</version>
<scope>test</scope>
</dependency>
and add the spring cloud streaming dependencies and the kafka binder
<!-- Streams and Kafka -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<type>test-jar</type>
<classifier>test-binder</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
What we’re doing is adding the streaming functionality and support test frameworks, but also adding the platform specific binding classes needed for kafka. Streaming is generic and will work across multiple platforms (like kafka and rabbitmq). However, in order to actually work against confluent, we need the kafka adapters (the binding classes).
Next, I globally refactored my Message
class to be called MessageModel
so that I wouldn’t conflict with the Message classes that spring cloud stream provides. I’ll be using that class name going forward.
Next, make a new MessageEvent
object under com.bullyrooks.cloud_application.messaging.dto
package com.bullyrooks.cloud_application.messaging.dto;
import lombok.Data;
@Data
public class MessageEvent {
private String messageId;
private String firstName;
private String lastName;
private String message;
}
This is going to be the dto used to transfer the event inside the message (as a json payload)
we also need a mapper. Create MessageEventMapper
in com.bullyrooks.cloud_application.messaging.mapper
package com.bullyrooks.cloud_application.messaging.mapper;
import com.bullyrooks.cloud_application.messaging.dto.MessageEvent;
import com.bullyrooks.cloud_application.service.model.MessageModel;
import org.mapstruct.Mapper;
import org.mapstruct.factory.Mappers;
@Mapper
public interface MessageEventMapper {
MessageEventMapper INSTANCE = Mappers.getMapper(MessageEventMapper.class);
MessageEvent modelToEvent(MessageModel model);
MessageModel eventToModel(MessageEvent returnEvent);
}
Next, lets delete the packages and classes in com.bullyrooks.cloud_application_repository
(or save them for later since we’re going to move them to the new service)
Next, we’re going to update the service class (MessageService
) to publish an event instead of storing in the repository
...
//new attribute to inject
StreamBridge streamBridge;
...
@Autowired
public MessageService(MessageGeneratorClient messageGeneratorClient,
StreamBridge streamBridge,
MeterRegistry logzioMeterRegistry){
this.messageGeneratorClient = messageGeneratorClient;
this.streamBridge = streamBridge;
this.logzioMeterRegistry = logzioMeterRegistry;
initCounters();
}
...
public MessageModel saveMessage(MessageModel messageModel) {
msgCount.increment();
if (StringUtils.isEmpty(messageModel.getMessage())) {
genMsgCount.increment();
log.info("No message, retrieve from message generator");
MessageResponseDTO dto = messageGeneratorClient.getMessage();
messageModel.setMessage(dto.getMessage());
genMsgSuccess.increment();
log.info("retrieved message: {}", messageModel.getMessage());
}
log.info("publishing event: {}", messageModel);
streamBridge.send("message.created",
MessageEventMapper.INSTANCE.modelToEvent(messageModel));
return messageModel;
}
Now we have to add the configuration so that our topics and binders can connect to confluent.
spring:
application:
name: cloud-application
cloud:
stream:
function:
bindings:
messageEvent-out-0:
destination: message.created
kafka:
properties:
sasl.mechanism: PLAIN
bootstrap.servers: pkc-2396y.us-east-1.aws.confluent.cloud:9092
security.protocol: SASL_SSL
we’re missing a key piece here which is the security credentials, so when you run make sure to add a -D property to configure them:
And the correct credentials can be found on confluent. There’s an option available when you create the cluster, I think. But they can be recreated this way:
From cluster overview, choose configure a client
Doesn’t matter which client, but we can pick java and you should get a screen like this:
If you pick create kafka cluster api key, you should be given the option to get a new api key:
You can copy/past the key and secret into your configuration from here. Make sure to have the leading/training "
(double quote) and surround the key/secret with the '
(single quote) as these will help escape any control characters.
Now we need to update the configuration so that our tests will work, add this to the application-test.yaml
spring:
cloud:
stream:
bindings:
messageEvent-out-0:
destination: message.created
and next the actual test. Add this annotation to allow spring to inject a test binder.
@Import(TestChannelBinderConfiguration.class)
Add this field so that we can access the test binder topic
@Autowired
OutputDestination outputDestination;
remove the repository injection and asserts and replace with the topic assertions. This is what the final test should look like:
package com.bullyrooks.cloud_application.controller;
import com.bullyrooks.cloud_application.controller.dto.CreateMessageRequestDTO;
import com.bullyrooks.cloud_application.controller.dto.CreateMessageResponseDTO;
import com.bullyrooks.cloud_application.message_generator.client.MessageGeneratorClient;
import com.bullyrooks.cloud_application.message_generator.client.dto.MessageResponseDTO;
import com.bullyrooks.cloud_application.messaging.dto.MessageEvent;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.javafaker.Faker;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.annotation.Import;
import org.springframework.http.ResponseEntity;
import org.springframework.messaging.Message;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.util.UriComponents;
import org.springframework.web.util.UriComponentsBuilder;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;
@ExtendWith(SpringExtension.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Slf4j
@AutoConfigureMockMvc
@Import(TestChannelBinderConfiguration.class)
public class MessageControllerTest {
private final String MESSAGE_PATH = "/message";
@LocalServerPort
int randomServerPort;
@MockBean
MessageGeneratorClient messageGeneratorClient;
@Autowired
OutputDestination outputDestination;
Faker faker = new Faker();
@Test
void testSaveMessage() throws IOException {
Long userId = 1l;
//given
CreateMessageRequestDTO request = CreateMessageRequestDTO
.builder()
.firstName(faker.name().firstName())
.lastName(faker.name().lastName())
.message(faker.gameOfThrones().quote())
.build();
//when
RestTemplate restTemplate = new RestTemplate();
String baseUrl = "http://localhost:" + randomServerPort + MESSAGE_PATH;
UriComponents builder = UriComponentsBuilder.fromHttpUrl(baseUrl)
.build();
ResponseEntity<CreateMessageResponseDTO> result = restTemplate.postForEntity(
builder.toUri(), request, CreateMessageResponseDTO.class);
//then
CreateMessageResponseDTO dto = result.getBody();
assertEquals(request.getFirstName(), dto.getFirstName());
assertEquals(request.getLastName(), dto.getLastName());
assertEquals(request.getMessage(), dto.getMessage());
Message<byte[]> receievedMessage = outputDestination.receive(1000,"message.created");
String messageStr = new String(receievedMessage.getPayload(), StandardCharsets.UTF_8);
log.info("message string: {}", messageStr);
ObjectMapper mapper = new ObjectMapper();
MessageEvent event = mapper.reader().readValue(messageStr, MessageEvent.class);
assertEquals(request.getFirstName(), event.getFirstName());
assertEquals(request.getLastName(), event.getLastName());
assertEquals(request.getMessage(), event.getMessage());
}
@Test
void testGetReturnsMessageIfMissing() throws InterruptedException, IOException {
Long userId = 1l;
//given
CreateMessageRequestDTO request = CreateMessageRequestDTO
.builder()
.firstName(faker.name().firstName())
.lastName(faker.name().lastName())
.build();
when(messageGeneratorClient.getMessage()).thenReturn(
MessageResponseDTO.builder()
.message(faker.gameOfThrones().quote())
.build());
//when
RestTemplate restTemplate = new RestTemplate();
String baseUrl = "http://localhost:" + randomServerPort + MESSAGE_PATH;
UriComponents builder = UriComponentsBuilder.fromHttpUrl(baseUrl)
.build();
ResponseEntity<CreateMessageResponseDTO> result = restTemplate.postForEntity(
builder.toUri(), request, CreateMessageResponseDTO.class);
//then
CreateMessageResponseDTO dto = result.getBody();
assertEquals(request.getFirstName(), dto.getFirstName());
assertEquals(request.getLastName(), dto.getLastName());
assertTrue(StringUtils.isNotBlank(dto.getMessage()));
Message<byte[]> receievedMessage = outputDestination.receive(1000,"message.created");
String messageStr = new String(receievedMessage.getPayload(), StandardCharsets.UTF_8);
log.info("message string: {}", messageStr);
ObjectMapper mapper = new ObjectMapper();
MessageEvent event = mapper.reader().readValue(messageStr, MessageEvent.class);
assertEquals(request.getFirstName(), event.getFirstName());
assertEquals(request.getLastName(), event.getLastName());
assertTrue(StringUtils.isNotBlank(event.getMessage()));
}
}
utDestination.receive(1000,"message.created");
String messageStr = new String(receievedMessage.getPayload(), StandardCharsets.UTF_8);
log.info("message string: {}", messageStr);
ObjectMapper mapper = new ObjectMapper();
MessageEvent event = mapper.reader().readValue(messageStr, MessageEvent.class);
assertEquals(request.getFirstName(), event.getFirstName());
assertEquals(request.getLastName(), event.getLastName());
assertTrue(StringUtils.isNotBlank(event.getMessage()));
}
}
You can see that we’re pulling a message off the topic, decoding it into a json message which we transform into an object before we run our asserts.
The final thing we’ll need to do is create a new secret and environment variable in our helm deployment, so that we can connect to confluent when we deploy to okteto
kubectl create secret generic confluent-secrets --from-literal=SPRING_KAFKA_PROPERTIES_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username='...' password='...';"
and add this environment configuration to the deployment.yaml
file
- name: SPRING_KAFKA_PROPERTIES_SASL_JAAS_CONFIG
valueFrom:
secretKeyRef:
name: confluent-secrets
key: SPRING_KAFKA_PROPERTIES_SASL_JAAS_CONFIG
Testing
We should be able to confirm that we can publish messages now. Navigate to the topic message page in confluent
Start up the application locally and hit the cloud-application endpoint with postman
You should see a message appear in the topic in confluent
Run the test again against the cloud instance and confirm you can still see the message being produced
0 comments on “Kube Cloud Pt5 | Create an Event Publisher”Add yours →