Kube Cloud Pt5 | Streaming Events with Kafka
full course- Kube Cloud Pt5 | Create a Consumer Service
- Kube Cloud Pt5 | Create an Event Publisher
- Kube Cloud Pt5 | Create an Event Consumer
Now that we’ve got messages being published to kafka, we are going to need to build our consumer that receives those events and stores them into the mongo database.
Go ahead and create a new repository called message-repository according to the microservice startup course here.
This is the pom.xml that we’ll need:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.bullyrooks</groupId>
<artifactId>message-repository</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>message-repository</name>
<description>Stores messages</description>
<properties>
<java.version>11</java.version>
<spring-cloud.version>2021.0.1</spring-cloud.version>
<org.mapstruct.version>1.4.2.Final</org.mapstruct.version>
<logstash-logback-encoder.version>6.4</logstash-logback-encoder.version>
<log4j2.version>2.17.1</log4j2.version>
<logzio-logback-appender.version>v1.0.25</logzio-logback-appender.version>
<micrometer-registry-logzio.version>1.0.2</micrometer-registry-logzio.version>
<javafaker.version>1.0.2</javafaker.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
<!-- Streams and Kafka -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<type>test-jar</type>
<classifier>test-binder</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- Health Checks and Metrics -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- mapstruct -->
<dependency>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct</artifactId>
<version>${org.mapstruct.version}</version>
</dependency>
<dependency>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct-processor</artifactId>
<version>${org.mapstruct.version}</version>
</dependency>
<!-- logging -->
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>${logstash-logback-encoder.version}</version>
</dependency>
<dependency>
<groupId>io.logz.logback</groupId>
<artifactId>logzio-logback-appender</artifactId>
<version>${logzio-logback-appender.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>io.logz.micrometer</groupId>
<artifactId>micrometer-registry-logzio</artifactId>
<version>${micrometer-registry-logzio.version}</version>
</dependency>
<dependency>
<groupId>com.github.javafaker</groupId>
<artifactId>javafaker</artifactId>
<version>${javafaker.version}</version>
</dependency>
<dependency>
<groupId>de.flapdoodle.embed</groupId>
<artifactId>de.flapdoodle.embed.mongo</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
You can see we’re adding the spring cloud streaming libraries as well as the mongodb ones.
Now make the MessageEvent
class in com.bullyrooks.messagerepository.event.dto
package com.bullyrooks.messagerepository.event.dto;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class MessageEvent {
private String messageId;
private String firstName;
private String lastName;
private String message;
}
This is the object structure that our event will serialize into
Now copy over the MessageRepository
, MessageDocument
and MessageDocumentMapper
from the previous implementation (you can find them in the github history of the cloud-application
project if you didn’t stash the changes anywhere). I put them into com.bullyrooks.messagerepository.repository
under document
and mapper
packages as needed.
There is one change to callout though. Since I don’t need a model object (the MessageDocument
will work) the mapper maps from the event to the document.
@Mapper
public interface MessageDocumentMapper {
MessageDocumentMapper INSTANCE = Mappers.getMapper(MessageDocumentMapper.class);
MessageDocument eventToDocument(MessageEvent msgEvent);
MessageEvent documentToEvent(MessageDocument returnDoc);
}
The MessageService
in com.bullyrooks.messagerepository.service
is very stripped down
package com.bullyrooks.messagerepository.service;
import com.bullyrooks.messagerepository.config.LoggingEnabled;
import com.bullyrooks.messagerepository.event.dto.MessageEvent;
import com.bullyrooks.messagerepository.repository.MessageRepository;
import com.bullyrooks.messagerepository.repository.document.MessageDocument;
import com.bullyrooks.messagerepository.repository.mapper.MessageDocumentMapper;
import com.bullyrooks.messagerepository.service.model.Message;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
@Slf4j
@LoggingEnabled
public class MessageStorageService {
MessageRepository messageRepository;
MeterRegistry logzioMeterRegistry;
Counter messageSaved;
@Autowired
public MessageStorageService(MessageRepository messageRepository,
MeterRegistry logzioMeterRegistry){
this.messageRepository = messageRepository;
this.logzioMeterRegistry = logzioMeterRegistry;
initCounters();
}
private void initCounters() {
messageSaved = Counter.builder("message.stored.count")
.description("Number of messages successfully stored in the repository")
.register(logzioMeterRegistry);
}
public MessageDocument saveMessage(MessageDocument msgDoc){
log.info("saving document: {}", msgDoc);
MessageDocument returnDoc = messageRepository.save(msgDoc);
messageSaved.increment();
return returnDoc;
}
}
Its essentially a copy of the one from cloud-application
, but it only writes to the repository.
Now lets implement the spring cloud stream consumer. Create a class called MessageEventConsumer
in com.bullyrooks.messagerepository.event
with this content
package com.bullyrooks.messagerepository.event;
import com.bullyrooks.messagerepository.event.dto.MessageEvent;
import com.bullyrooks.messagerepository.repository.mapper.MessageDocumentMapper;
import com.bullyrooks.messagerepository.service.MessageStorageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.function.Consumer;
@Component
@Slf4j
public class MessageEventConsumer {
@Autowired
MessageStorageService storageService;
@Bean
public Consumer<MessageEvent> consumeMessageEvent(){
return (eventIn) -> storageService
.saveMessage(MessageDocumentMapper.INSTANCE.eventToDocument(eventIn));
}
}
This is all you need. A component that injects the service. The Bean defines the function which has a special signature. The Consumer tells the function that it should expect a MessageEvent
object, this is different than a normal signature where the input is in the parenthesis. Then we’re saying for each MessageEvent
(eventIn
), call the mapper and send the result to the storage service. That’s all we have to do!
the application.yaml
should look familiar we’ve seen the mongo, health and kafka configuration before
spring:
application:
name: message-repository
cloud:
stream:
bindings:
consumeMessageEvent-in-0:
destination: message.created
kafka:
properties:
sasl.mechanism: PLAIN
bootstrap.servers: pkc-2396y.us-east-1.aws.confluent.cloud:9092
security.protocol: SASL_SSL
data:
mongodb:
uri: mongodb+srv://bullyrooks:${mongodb.password}@bullyrooks.4zqpz.mongodb.net/bullyrooks?retryWrites=true&w=majority
management:
endpoints:
web:
exposure:
include: "*"
endpoint:
health:
probes:
enabled: true
show-details: always
group:
readiness:
include: "readinessState,mongo,messageGenerator"
metrics:
enabled: true
logging:
level:
root: INFO
com.bullyrooks: DEBUG
io.micrometer.logzio: WARN
logzio:
metrics:
url: https://listener.logz.io:8053
registry:
mock: false
The unique part here is that we’re defining an in
binding for the topic and we need to use the same topic name (destination
) that the cloud-application
is publishing to.
The MessageEventConsumerTest
in src/test/java/com/bullyrooks/messagerepository/event
is going to look a little new though
package com.bullyrooks.messagerepository.event;
import com.bullyrooks.messagerepository.event.dto.MessageEvent;
import com.bullyrooks.messagerepository.repository.MessageRepository;
import com.bullyrooks.messagerepository.repository.document.MessageDocument;
import com.github.javafaker.Faker;
import com.github.javafaker.service.FakeValuesService;
import com.github.javafaker.service.RandomService;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.binder.test.InputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.annotation.Import;
import org.springframework.data.domain.Example;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import java.util.Locale;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ExtendWith(SpringExtension.class)
@SpringBootTest
@AutoConfigureDataMongo
@Slf4j
@ActiveProfiles("test")
@Import(TestChannelBinderConfiguration.class)
public class MessageEventConsumerTest {
@Autowired
MessageRepository messageRepository;
FakeValuesService fakesvc = new FakeValuesService(
new Locale("en-US"), new RandomService());
Faker faker = new Faker();
@Autowired
private InputDestination inputDestination;
@Test
void testMessageStore() {
//given
MessageEvent msgEventIn = MessageEvent.builder()
.firstName(faker.name().firstName())
.lastName(faker.name().lastName())
.message(faker.gameOfThrones().quote()).build();
Message<MessageEvent> messageIn = new GenericMessage<>(msgEventIn);
//when
inputDestination.send(messageIn);
//then
Example<MessageDocument> example = Example.of(
MessageDocument.builder()
.firstName(msgEventIn.getFirstName())
.lastName(msgEventIn.getLastName())
.message(msgEventIn.getMessage())
.build());
MessageDocument doc = messageRepository.findOne(example).get();
assertEquals(msgEventIn.getFirstName(), doc.getFirstName());
assertEquals(msgEventIn.getLastName(), doc.getLastName());
assertEquals(msgEventIn.getMessage(), doc.getMessage());
}
}
We’re using the TestChannelBinderConfiguration
to inject an InputDestination
this time. The input is a MessageEvent
that we wrap into a GenericMessage
object that we then post to that destination. Additionally, since we don’t have an id to search the repo for, we use an example query object to search for the record we expect to be there.
The default test for MessageRepositoryApplication needs an update as well. It needs to know about mongo in order to actually be able to run. Add these annotations
@SpringBootTest
@AutoConfigureDataMongo
@ActiveProfiles("test")
class MessageRepositoryApplicationTests {
@Test
void contextLoads() {
}
}
The application-test.yaml
in the src/test/resources
directory needs to know the mongodb version
spring:
mongodb:
embedded:
version: 2.6.10 # Version of Mongo to use.
Finally, add the same environment configuration to the helm deployment.yaml
file as we did for cloud-application
, make sure to get the mongo config copied over as well.
- name: SPRING_DATA_MONGODB_URI
valueFrom:
secretKeyRef:
name: mongo-secrets
key: SPRING_DATA_MONGODB_URI
- name: SPRING_KAFKA_PROPERTIES_SASL_JAAS_CONFIG
valueFrom:
secretKeyRef:
name: confluent-secrets
key: SPRING_KAFKA_PROPERTIES_SASL_JAAS_CONFIG
Verifying
We could run this locally, but we won’t be able to test it directly unless we have a way to push messages onto the kafka topic. One way to test is to run both cloud-application and message-repository locally. You should see message requests generate events that are consumed by message-repository in the logs
2022-02-27 15:33:23,588 DEBUG [,] c.b.m.c.LoggingAspect: -> MessageStorageService.saveMessage invocation. params: {"msgDoc":{"messageId":null,"firstName":"Bully","lastName":"Rooks","message":"local test 123"}}
2022-02-27 15:33:23,592 INFO [,] c.b.m.s.MessageStorageService: saving document: MessageDocument(messageId=null, firstName=Bully, lastName=Rooks, message=local test 123)
2022-02-27 15:33:23,728 DEBUG [,] c.b.m.c.LoggingAspect: <- MessageStorageService.saveMessage returns:{"messageId":"621bfc3369057f23a56b7d2e","firstName":"Bully","lastName":"Rooks","message":"local test 123"}. Execution time: 138ms
Go ahead and push up the changes to your feature branch and then merge to main when the build succeeds.
Verifying in the Cloud
Turn off all of your local instances. They’re currently in the same consumer group as the okteto versions and could be the recipient of the message to process. Hit the cloud-application endpoint for the okteto instance in postman
We should see the message in confluent
You can see the logs in Kibana
And you can see the full trace in Jaeger
In particular, you can see that the calls through the controller respond faster because the longer call to mongodb is not on the original request path now.
Note: there is an issue with OTEL and kafka tracing. I did have to make a change to my dockerfile in order to get it working:
ENTRYPOINT ["java","-Dotel.instrumentation.common.experimental.suppress-messaging-receive-spans=true", "-jar", "app.jar"]
0 comments on “Kube Cloud Pt5 | Create an Event Consumer”Add yours →