Kube Cloud Pt5 | Create an Event Consumer

Kube Cloud Pt5 | Create an Event Consumer

Kube Cloud Pt5 | Streaming Events with Kafka

full course
  1. Kube Cloud Pt5 | Create a Consumer Service
  2. Kube Cloud Pt5 | Create an Event Publisher
  3. Kube Cloud Pt5 | Create an Event Consumer

Now that we’ve got messages being published to kafka, we are going to need to build our consumer that receives those events and stores them into the mongo database.

Go ahead and create a new repository called message-repository according to the microservice startup course here.

This is the pom.xml that we’ll need:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.6.3</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.bullyrooks</groupId>
	<artifactId>message-repository</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>message-repository</name>
	<description>Stores messages</description>
	<properties>
		<java.version>11</java.version>
		<spring-cloud.version>2021.0.1</spring-cloud.version>
		<org.mapstruct.version>1.4.2.Final</org.mapstruct.version>
		<logstash-logback-encoder.version>6.4</logstash-logback-encoder.version>
		<log4j2.version>2.17.1</log4j2.version>
		<logzio-logback-appender.version>v1.0.25</logzio-logback-appender.version>
		<micrometer-registry-logzio.version>1.0.2</micrometer-registry-logzio.version>
		<javafaker.version>1.0.2</javafaker.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-mongodb</artifactId>
		</dependency>
		<!-- Streams and Kafka -->
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream-binder-kafka</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream</artifactId>
			<type>test-jar</type>
			<classifier>test-binder</classifier>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-devtools</artifactId>
			<scope>runtime</scope>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>


		<!-- Health Checks and Metrics -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-actuator</artifactId>
		</dependency>

		<!-- mapstruct -->
		<dependency>
			<groupId>org.mapstruct</groupId>
			<artifactId>mapstruct</artifactId>
			<version>${org.mapstruct.version}</version>
		</dependency>
		<dependency>
			<groupId>org.mapstruct</groupId>
			<artifactId>mapstruct-processor</artifactId>
			<version>${org.mapstruct.version}</version>
		</dependency>

		<!-- logging -->
		<dependency>
			<groupId>net.logstash.logback</groupId>
			<artifactId>logstash-logback-encoder</artifactId>
			<version>${logstash-logback-encoder.version}</version>
		</dependency>
		<dependency>
			<groupId>io.logz.logback</groupId>
			<artifactId>logzio-logback-appender</artifactId>
			<version>${logzio-logback-appender.version}</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-aop</artifactId>
		</dependency>
		<dependency>
			<groupId>io.logz.micrometer</groupId>
			<artifactId>micrometer-registry-logzio</artifactId>
			<version>${micrometer-registry-logzio.version}</version>
		</dependency>

		<dependency>
			<groupId>com.github.javafaker</groupId>
			<artifactId>javafaker</artifactId>
			<version>${javafaker.version}</version>
		</dependency>

		<dependency>
			<groupId>de.flapdoodle.embed</groupId>
			<artifactId>de.flapdoodle.embed.mongo</artifactId>
			<scope>test</scope>
		</dependency>

	</dependencies>
	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework.cloud</groupId>
				<artifactId>spring-cloud-dependencies</artifactId>
				<version>${spring-cloud.version}</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
				<configuration>
					<excludes>
						<exclude>
							<groupId>org.projectlombok</groupId>
							<artifactId>lombok</artifactId>
						</exclude>
					</excludes>
				</configuration>
			</plugin>
		</plugins>
	</build>

</project>

You can see we’re adding the spring cloud streaming libraries as well as the mongodb ones.

Now make the MessageEvent class in com.bullyrooks.messagerepository.event.dto

package com.bullyrooks.messagerepository.event.dto;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class MessageEvent {
    private String messageId;
    private String firstName;
    private String lastName;
    private String message;
}

This is the object structure that our event will serialize into

Now copy over the MessageRepository, MessageDocument and MessageDocumentMapper from the previous implementation (you can find them in the github history of the cloud-application project if you didn’t stash the changes anywhere). I put them into com.bullyrooks.messagerepository.repository under document and mapper packages as needed.

There is one change to callout though. Since I don’t need a model object (the MessageDocument will work) the mapper maps from the event to the document.

@Mapper
public interface MessageDocumentMapper {
    MessageDocumentMapper INSTANCE = Mappers.getMapper(MessageDocumentMapper.class);

    MessageDocument eventToDocument(MessageEvent msgEvent);

    MessageEvent documentToEvent(MessageDocument returnDoc);
}

The MessageService in com.bullyrooks.messagerepository.service is very stripped down

package com.bullyrooks.messagerepository.service;

import com.bullyrooks.messagerepository.config.LoggingEnabled;
import com.bullyrooks.messagerepository.event.dto.MessageEvent;
import com.bullyrooks.messagerepository.repository.MessageRepository;
import com.bullyrooks.messagerepository.repository.document.MessageDocument;
import com.bullyrooks.messagerepository.repository.mapper.MessageDocumentMapper;
import com.bullyrooks.messagerepository.service.model.Message;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
@Slf4j
@LoggingEnabled
public class MessageStorageService {

    MessageRepository messageRepository;
    MeterRegistry logzioMeterRegistry;

    Counter messageSaved;

    @Autowired
    public MessageStorageService(MessageRepository messageRepository,
                                 MeterRegistry logzioMeterRegistry){
        this.messageRepository = messageRepository;
        this.logzioMeterRegistry = logzioMeterRegistry;
        initCounters();
    }

    private void initCounters() {
        messageSaved = Counter.builder("message.stored.count")
                .description("Number of messages successfully stored in the repository")
                .register(logzioMeterRegistry);
    }


    public MessageDocument saveMessage(MessageDocument msgDoc){
        log.info("saving document: {}", msgDoc);
        MessageDocument returnDoc = messageRepository.save(msgDoc);
        messageSaved.increment();
        return returnDoc;
    }
}

Its essentially a copy of the one from cloud-application, but it only writes to the repository.

Now lets implement the spring cloud stream consumer. Create a class called MessageEventConsumer in com.bullyrooks.messagerepository.event with this content

package com.bullyrooks.messagerepository.event;

import com.bullyrooks.messagerepository.event.dto.MessageEvent;
import com.bullyrooks.messagerepository.repository.mapper.MessageDocumentMapper;
import com.bullyrooks.messagerepository.service.MessageStorageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.util.function.Consumer;

@Component
@Slf4j
public class MessageEventConsumer {

    @Autowired
    MessageStorageService storageService;

    @Bean
    public Consumer<MessageEvent> consumeMessageEvent(){
        return (eventIn) -> storageService
                .saveMessage(MessageDocumentMapper.INSTANCE.eventToDocument(eventIn));
    }
}

This is all you need. A component that injects the service. The Bean defines the function which has a special signature. The Consumer tells the function that it should expect a MessageEvent object, this is different than a normal signature where the input is in the parenthesis. Then we’re saying for each MessageEvent (eventIn), call the mapper and send the result to the storage service. That’s all we have to do!

the application.yaml should look familiar we’ve seen the mongo, health and kafka configuration before

spring:
  application:
    name: message-repository
  cloud:
    stream:
      bindings:
        consumeMessageEvent-in-0:
          destination: message.created
  kafka:
    properties:
      sasl.mechanism: PLAIN
      bootstrap.servers: pkc-2396y.us-east-1.aws.confluent.cloud:9092
      security.protocol: SASL_SSL
  data:
    mongodb:
      uri: mongodb+srv://bullyrooks:${mongodb.password}@bullyrooks.4zqpz.mongodb.net/bullyrooks?retryWrites=true&w=majority
management:
  endpoints:
    web:
      exposure:
        include: "*"
  endpoint:
    health:
      probes:
        enabled: true
      show-details: always
      group:
        readiness:
          include: "readinessState,mongo,messageGenerator"
    metrics:
      enabled: true

logging:
  level:
    root: INFO
    com.bullyrooks: DEBUG
    io.micrometer.logzio: WARN
logzio:
  metrics:
    url: https://listener.logz.io:8053
    registry:
      mock: false 

The unique part here is that we’re defining an in binding for the topic and we need to use the same topic name (destination) that the cloud-application is publishing to.

The MessageEventConsumerTest in src/test/java/com/bullyrooks/messagerepository/event is going to look a little new though

package com.bullyrooks.messagerepository.event;

import com.bullyrooks.messagerepository.event.dto.MessageEvent;
import com.bullyrooks.messagerepository.repository.MessageRepository;
import com.bullyrooks.messagerepository.repository.document.MessageDocument;
import com.github.javafaker.Faker;
import com.github.javafaker.service.FakeValuesService;
import com.github.javafaker.service.RandomService;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.binder.test.InputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.annotation.Import;
import org.springframework.data.domain.Example;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit.jupiter.SpringExtension;

import java.util.Locale;

import static org.junit.jupiter.api.Assertions.assertEquals;

@ExtendWith(SpringExtension.class)
@SpringBootTest
@AutoConfigureDataMongo
@Slf4j
@ActiveProfiles("test")
@Import(TestChannelBinderConfiguration.class)
public class MessageEventConsumerTest {

    @Autowired
    MessageRepository messageRepository;


    FakeValuesService fakesvc = new FakeValuesService(
            new Locale("en-US"), new RandomService());
    Faker faker = new Faker();

    @Autowired
    private InputDestination inputDestination;

    @Test
    void testMessageStore() {

        //given
        MessageEvent msgEventIn = MessageEvent.builder()
                .firstName(faker.name().firstName())
                .lastName(faker.name().lastName())
                .message(faker.gameOfThrones().quote()).build();
        Message<MessageEvent> messageIn = new GenericMessage<>(msgEventIn);

        //when
        inputDestination.send(messageIn);

        //then
        Example<MessageDocument> example = Example.of(
                MessageDocument.builder()
                        .firstName(msgEventIn.getFirstName())
                        .lastName(msgEventIn.getLastName())
                        .message(msgEventIn.getMessage())
                        .build());
        MessageDocument doc = messageRepository.findOne(example).get();
        assertEquals(msgEventIn.getFirstName(), doc.getFirstName());
        assertEquals(msgEventIn.getLastName(), doc.getLastName());
        assertEquals(msgEventIn.getMessage(), doc.getMessage());
    }

}

We’re using the TestChannelBinderConfiguration to inject an InputDestination this time. The input is a MessageEvent that we wrap into a GenericMessage object that we then post to that destination. Additionally, since we don’t have an id to search the repo for, we use an example query object to search for the record we expect to be there.

The default test for MessageRepositoryApplication needs an update as well. It needs to know about mongo in order to actually be able to run. Add these annotations

@SpringBootTest
@AutoConfigureDataMongo
@ActiveProfiles("test")
class MessageRepositoryApplicationTests {

	@Test
	void contextLoads() {
	}

}

The application-test.yaml in the src/test/resources directory needs to know the mongodb version

spring:
  mongodb:
    embedded:
      version: 2.6.10 # Version of Mongo to use.

Finally, add the same environment configuration to the helm deployment.yaml file as we did for cloud-application, make sure to get the mongo config copied over as well.

          - name: SPRING_DATA_MONGODB_URI
            valueFrom:
              secretKeyRef:
                name: mongo-secrets
                key: SPRING_DATA_MONGODB_URI
          - name: SPRING_KAFKA_PROPERTIES_SASL_JAAS_CONFIG
            valueFrom:
              secretKeyRef:
                name: confluent-secrets
                key: SPRING_KAFKA_PROPERTIES_SASL_JAAS_CONFIG

Verifying

We could run this locally, but we won’t be able to test it directly unless we have a way to push messages onto the kafka topic. One way to test is to run both cloud-application and message-repository locally. You should see message requests generate events that are consumed by message-repository in the logs

2022-02-27 15:33:23,588 DEBUG [,] c.b.m.c.LoggingAspect: -> MessageStorageService.saveMessage invocation.  params: {"msgDoc":{"messageId":null,"firstName":"Bully","lastName":"Rooks","message":"local test 123"}}
2022-02-27 15:33:23,592 INFO  [,] c.b.m.s.MessageStorageService: saving document: MessageDocument(messageId=null, firstName=Bully, lastName=Rooks, message=local test 123)
2022-02-27 15:33:23,728 DEBUG [,] c.b.m.c.LoggingAspect: <- MessageStorageService.saveMessage returns:{"messageId":"621bfc3369057f23a56b7d2e","firstName":"Bully","lastName":"Rooks","message":"local test 123"}.  Execution time: 138ms

Go ahead and push up the changes to your feature branch and then merge to main when the build succeeds.

Verifying in the Cloud

Turn off all of your local instances. They’re currently in the same consumer group as the okteto versions and could be the recipient of the message to process. Hit the cloud-application endpoint for the okteto instance in postman

We should see the message in confluent

You can see the logs in Kibana

And you can see the full trace in Jaeger

In particular, you can see that the calls through the controller respond faster because the longer call to mongodb is not on the original request path now.

Note: there is an issue with OTEL and kafka tracing. I did have to make a change to my dockerfile in order to get it working:

ENTRYPOINT ["java","-Dotel.instrumentation.common.experimental.suppress-messaging-receive-spans=true", "-jar", "app.jar"]

0 comments on “Kube Cloud Pt5 | Create an Event ConsumerAdd yours →

Leave a Reply

Your email address will not be published. Required fields are marked *