WildFly, Arquillian, Testcontainers, and Kafka
Wednesday, August 24, 2022 |Back again with another Testcontainers example. This time, though, the environment is a bit different. We’ll be looking at a Jakarta EE application using WildFly and MicroProfile Reactive Messaging (MP RM), and we’re going to test it using Arquillian and Testcontainers. Let’s get to it. :)
The Application
To make things simple, we’ll develop a really simple application. It will have one endpoint that takes an entity, then publishes it to a Kafka stream. It’s not a very interesting app, but should be enough for demonstration purposes.
While you can see the entire application in the repo, for completeness' sake, let’s show the pieces, starting with the code:
1
2
3
4
5
6
7
8
9
10
11
12
@Path("/")
@RequestScoped
public class MyResource {
@Inject
private MyService service;
@POST
public Response create(MyModel model) {
return Response.ok(service.sendModel(model))
.build();
}
}
This just takes a MyModel
instance, passes it to the service, the passing along to the client what the service returns.
1
2
3
4
5
6
7
8
9
10
11
12
13
@RequestScoped
public class MyService {
@Inject
@Channel("model")
Emitter<MyModel> emitter;
public MyModel sendModel(MyModel model) {
emitter.send(model);
return model;
}
}
In the service, we @Inject
a MicroProfile Reactive Messaging Emitter
, which is annotated with the name of the channel to which MyModel
instances will be emitted. With that in hand, we send the model, and return the value. The real magic, if you can call it that, is in MessageService
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Dependent
public class MessageService {
@Incoming("model")
@Outgoing("model-event")
public Message<String> sendToKafka(MyModel model) {
String data = JsonbBuilder.create().toJson(model);
Message<String> m = Message.of(data);
// Create Metadata containing the Kafka key
OutgoingKafkaRecordMetadata<String> md = OutgoingKafkaRecordMetadata
.<String>builder().withKey(model.getId().toString())
.build();
// The returned message will have the metadata added
return KafkaMetadataUtil.writeOutgoingKafkaMetadata(m, md);
}
}
Things to note:
-
The method
sendToKafka()
is annotated with@Incoming("channel")
. Thanks to the magic of Reactive Messaging, any time aMyModel
is emitted to that channel, this method will be called with thatMyModel
instance. -
The method is also annotated with
@Outgoing("model-event")
. Whatever this method returns will be published to themodel-event
stream. Notice that we use some Kafka APIs to add some metadata to the object before writing.
Now we need to set up the MP RM wiring for Kafka, which we do via MicroProfile Config:
1
2
3
4
5
6
7
mp.messaging.connector.smallrye-kafka.bootstrap.servers=localhost:9092
mp.messaging.connector.smallrye-kafka.group.id="watkdemo"
mp.messaging.outgoing.model-event.connector=smallrye-kafka
mp.messaging.outgoing.model-event.topic=ModelEvent
mp.messaging.outgoing.model-event.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.model-event.value.serializer=org.apache.kafka.common.serialization.StringSerializer
In theory, once you start up Kafka and WildFly (as described in the README), you should be able to deploy and test the application:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
$ mvn package wildfly:deploy
$ http -v POST :8080/wildfly-arq-testcontainers-kafka-1.0-SNAPSHOT \
foo=blah bar=49152
POST /wildfly-arq-testcontainers-kafka-1.0-SNAPSHOT HTTP/1.1
Accept: application/json, */*;q=0.5
Accept-Encoding: gzip, deflate, br
Connection: keep-alive
Content-Length: 31
Content-Type: application/json
Host: localhost:8080
User-Agent: HTTPie/3.2.1
{
"bar": "49152",
"foo": "blah"
}
HTTP/1.1 200 OK
Connection: keep-alive
Content-Length: 70
Content-Type: application/json
Date: Tue, 23 Aug 2022 21:54:02 GMT
{
"bar": 49152,
"foo": "blah",
"id": "f76acba6-08fd-4ff1-a357-c9d23d471154"
}
Voilà! Now how do we test it?
The Test
I’ve skipped looking at the Maven POM so far, as it’s pretty run-of-the-mill for the application part, but to get a test set up, we’re going to need peel back the covers on it. Let’s start at the top:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<version.arquillian>1.6.0.Final</version.arquillian>
<version.failsafe.plugin>${version.surefire.plugin}</version.failsafe.plugin>
<version.surefire.plugin>2.22.2</version.surefire.plugin>
<version.testcontainers>1.17.3</version.testcontainers>
<version.wildfly.maven.plugin>3.0.2.Final</version.wildfly.maven.plugin>
<version.wildfly>26.1.1.Final</version.wildfly>
<wildfly.dir>${project.basedir}/target/wildfly-${version.wildfly}</wildfly.dir>
</properties>
The interesting portion here is the version definitions. Feel free use Java 17 or later if you like. :)
The integration test profile
First step, let’s set up our integration tests in a profile
. This allows us to have normal unit testing as part of the build and save the integration tests for CI or an explicit manual run for faster feedback and build loops:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
<profiles>
<profile>
<!-- An optional Arquillian testing profile that executes tests in your JBoss EAP instance.
This profile will start a new JBoss EAP instance, and execute the test, shutting it
down when done. Run with: mvn clean verify -Parq-managed -->
<id>arq-managed</id>
<build>
<defaultGoal>verify</defaultGoal>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>${version.failsafe.plugin}</version>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
<configuration>
<systemPropertyVariables>
<arquillian.launch>jboss</arquillian.launch>
<java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
<jboss.home>${wildfly.dir}</jboss.home>
<kafka.server>${kafka.server}</kafka.server>
</systemPropertyVariables>
<redirectTestOutputToFile>false</redirectTestOutputToFile>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
Here we’re simply configuring the maven-failsafe-plugin
, and setting the default goal to verify
should this profile be activated (via -Parq-managed
on the Maven command line).
We’re also setting some system properties, the most notable of which is the Arquillian config we want to launch (arquillian.launch
), and the location of the WildFly (jboss.home
) and Kafka (kafka.server
) servers. We’ll fill in those details shortly.
Arquillian Config
To configure Arquillian, we need an arquillian.xml
config file:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<?xml version="1.0" encoding="UTF-8"?>
<arquillian xmlns="http://jboss.org/schema/arquillian"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://jboss.org/schema/arquillian
http://jboss.org/schema/arquillian/arquillian_1_0.xsd">
<engine>
<property name="deploymentExportPath">target/deployments</property>
</engine>
<container qualifier="jboss" default="true">
<configuration>
<property name="allowConnectingToRunningServer">true</property>
</configuration>
</container>
</arquillian>
I’m a big fan of holding on the deployment archives that are created for Arquillian testing, so we configure that in the engine
section.
In order to test on WildFly, we’ll need a WildFly instance. Arquillian supports a few different operating modes for the test server, but we’re interested in managed
, which means Arquillian will start and stop the server as needed. The WildFly connector for Arquillian, though, is going to require that it be pointed at a local installation (and not a zip). Downloading and extracting zips via Maven isn’t very pretty (IMO), but, fortunately, the WildFly Maven Plugin lets us build the exact server we want, so let’s do that.
The Test WildFly Instance
First, we’ll define a version in the pluginManagement
section of the build. Declaring this in the main build allows us to use it deploy the application, as well as to provision a test server.
1
2
3
4
5
6
7
8
9
10
11
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.wildfly.plugins</groupId>
<artifactId>wildfly-maven-plugin</artifactId>
<version>${version.wildfly.maven.plugin}</version>
</plugin>
</plugins>
</pluginManagement>
</build>
Next, in our arq-managed
profile, we configure a use of the plugin to provision our server:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<plugin>
<groupId>org.wildfly.plugins</groupId>
<artifactId>wildfly-maven-plugin</artifactId>
<executions>
<execution>
<id>provision-server</id>
<phase>pre-integration-test</phase>
<goals>
<goal>provision</goal>
</goals>
<configuration>
<recordProvisioningState>true</recordProvisioningState>
<feature-packs>
<feature-pack>
<location>
org.wildfly:wildfly-cloud-legacy-galleon-pack:${version.wildfly}
</location>
</feature-pack>
</feature-packs>
<layers>
<layer>jaxrs-server</layer>
<layer>microprofile-reactive-messaging</layer>
<layer>microprofile-reactive-messaging-kafka</layer>
</layers>
</configuration>
</execution>
</executions>
<configuration>
<provisioning-dir>${wildfly.dir}</provisioning-dir>
</configuration>
</plugin>
When the plugin executes, it will build a server based on version ${version.wildfly}
, and add support for only JAX-RS and MicroProfile Reactive Messaging with Kafka support (and any needed dependencies). This gives us a thinner, smaller server to work with.
This works great for testing, but you can also use this approach (via the Galleon command line utility), to build slimmed down server for production deployments, but that’s a topic for another day. :) |
The generated server is put in ${project.basedir}/target
via provisioning-dir
(and the property defined above) so we can easily clean up after ourselves. Note that we use value to set jboss.home
above in the maven-failsafe-pugin
configuration so Arquillian can find the server.
That’s a lot of steps already, but we’re still not quite ready to write tests yet. We need a Kafka server.
The Test Kafka Instance
We’re going to use Testcontainers to manage our Kafka instance. If you read my 'Quarkus Dev Services, jOOQ, Flyway, and Testcontainers: A Full Example' post, this approach will be familiar to you. We’ll use the groovy-maven-plugin
to create a Testcontainer-based Kafka instance, and pass the relevant information to the test via system properties.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<plugin>
<groupId>org.codehaus.gmaven</groupId>
<artifactId>groovy-maven-plugin</artifactId>
<version>2.1.1</version>
<executions>
<execution>
<id>kafka</id>
<phase>pre-integration-test</phase>
<goals>
<goal>execute</goal>
</goals>
<configuration>
<source>
def image = org.testcontainers.utility.DockerImageName.parse("confluentinc/cp-kafka").withTag("7.2.1")
def kafka = new org.testcontainers.containers.KafkaContainer(image)
kafka.start()
project.properties.setProperty('kafka.server', kafka.bootstrapServers)
</source>
</configuration>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>${version.testcontainers}</version>
</dependency>
</dependencies>
</plugin>
There’s nothing particularly interesting here if you’re familiar with Testcontainers, but here’s a summary
-
We parse an image name (
confluentinc/cp-kafka:7.2.1
) -
We create an instance of
KafkaContainer
, using that image -
We start the server
-
We get the
bootstrapServers
value, and assign that to a build property -
In the
maven-failsafe-plugin
config above, we set a system property using this build property
As the build finishes and the JVM exits, the container is shut down and cleaned up. It’s really pretty slick.
Take a deep breath — and maybe a coffee break — as we’re in the home stretch. It’s actually time to write a test. :)
The Test. For Real.
Here’s (most of) the test class:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@RunWith(Arquillian.class)
public class MyServiceIT {
@ArquillianResource
private URL url;
@Deployment
public static Archive getDeployment() throws IOException {
String config = Files.readString(Path.of("src/main/resources/META-INF/microprofile-config.properties"));
config = config.replaceAll("localhost:9092", System.getProperty("kafka.server"));
return ShrinkWrap.create(WebArchive.class, "test.war")
.addAsWebInfResource(EmptyAsset.INSTANCE, "beans.xml") // Warning: This breaks in EE 10
.addAsResource(new StringAsset(config), "META-INF/microprofile-config.properties")
.addPackages(true, MyService.class.getPackage().getName());
}
@Test
@RunAsClient
public void sendMessage() throws Exception {
int count = 0;
boolean found = false;
sendRestRequest();
KafkaConsumer<String, String> consumer = getConsumer();
consumer.subscribe(Collections.singleton("ModelEvent"));
while (!found && count < 10) {
consumer.seekToBeginning(consumer.assignment());
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
count++;
for (ConsumerRecord<String, String> r : records) {
found = true;
System.out.println("***** Message received: " + r.value());
}
}
assertTrue("Message not found in stream", found);
}
}
I don’t want to spend too much time on Arquillian specifics, so we’ll move fast here:
-
We annotate the class with
@RunWith(Arquillian.class)
to make this an Arquillian test. -
This class will be wrapped up and deployed to the server, so we can
@Inject
the class we want to test (MyService
) -
We do, though, need to define what to deploy, so we have an
@Deployment
.-
To make things simpler, I’m simply adding everything under the package
com.steeplesoft.watkdemo
-
I’m also reading the MP Config file and changing the value for the Kafka server to point to our test containers. There are probably smarter ways of doing this, but they eluded me long enough that I went with the sledgehammer. :P
-
The test is where things get interesting. We’re going to do an end-to-end test (thus "integration test"), from REST request to Kafka stream, so we make this test as @RunAsClient
. In it, send the REST request (see below), then we connect to our test Kafka server and poll it until either we find our message, or we time out.
I’m not a Kafka expert, so please be kind. :) If you know a better way to do this, then please feel free. You can also find me and clue me in. :P |
To send the request, we have this method, using Java 11’s HttpClient
:
1
2
3
4
5
6
7
8
9
10
private void sendRestRequest() throws Exception {
HttpRequest request = HttpRequest.newBuilder(url.toURI())
.header("Accept", MediaType.APPLICATION_JSON)
.header("Content-type", MediaType.APPLICATION_JSON)
.POST(HttpRequest.BodyPublishers.ofString(
new ObjectMapper().writeValueAsString(new MyModel("foo", 49152))))
.build();
HttpResponse<String> response = HttpClient.newHttpClient().send(request, HttpResponse.BodyHandlers.ofString());
assertEquals(response.statusCode(), 200);
}
We should have all the pieces in place, so let’s run the test. You should see something like this:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
$ mvn -Parq-managed verify
...
[INFO] Checking the system...
[INFO] ✔︎ Docker server version should be at least 1.6.0
[INFO] Creating container for image: confluentinc/cp-kafka:7.2.1
[INFO] Creating container for image: testcontainers/ryuk:0.3.3
[INFO] Container testcontainers/ryuk:0.3.3 is starting: 41c8a0373fe07619232a748df3e39d2ef40425b43bc203f3188a5b24260c3113
[INFO] Container testcontainers/ryuk:0.3.3 started in PT0.414602S
[INFO] Container confluentinc/cp-kafka:7.2.1 is starting: 5ec0656459ece52dbf69e9cb99f0f7c8cfabc81686fbed8036c25500289da31b
[INFO] Container confluentinc/cp-kafka:7.2.1 started in PT4.465336S
...
INFO] Running com.steeplesoft.watkdemo.MyServiceIT
Aug 24, 2022 12:49:20 PM org.jboss.threads.Version <clinit>
INFO: JBoss Threads version 2.3.0.Beta2
Aug 24, 2022 12:49:20 PM org.jboss.as.arquillian.container.CommonManagedDeployableContainer startInternal
...
12:49:23,413 INFO [org.jboss.as.server.deployment] (MSC service thread 1-2) WFLYSRV0027: Starting deployment of "test.war" (runtime-name: "test.war")
12:49:23,830 INFO [org.jboss.weld.deployer] (MSC service thread 1-4) WFLYWELD0003: Processing weld deployment test.war
...
12:49:26,073 INFO [org.apache.kafka.clients.Metadata] (kafka-producer-network-thread | kafka-producer-model-event) [Producer clientId=kafka-producer-model-event] Resetting the last seen epoch of partition ModelEvent-0 to 0 since the associated topicId changed from null to oOHfjaIyR-S-gvivnsnGYg
***** Message received: {"bar":49152,"foo":"foo","id":"2b66b9c4-3de5-40a4-91b6-e7b511f5233b"}
12:49:26,866 INFO [org.wildfly.extension.undertow] (ServerService Thread Pool -- 15) WFLYUT0022: Unregistered web context: '/test' from server 'default-server'
12:49:26,871 INFO [org.apache.kafka.clients.producer.KafkaProducer] (smallrye-kafka-producer-thread-0) [Producer clientId=kafka-producer-model-event] Closing the Kafka producer with timeoutMillis = 10000 ms.
...
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 6.426 s - in com.steeplesoft.watkdemo.MyServiceIT
In that brief log excerpt, we can see
-
The test container spinning up
-
WildFly starting
-
The output from our test when we find the message
-
And the most important part:
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0
We’re Done!
While the application under test is pretty simple, hopefully this example will give you enough to test your application if you have a similar architectural stack. If you have questions, comments, concerns, etc., you can find me on Twitter. :)