diff --git a/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java b/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java index 893ef4ba9f513..ba4b562ae5c6e 100644 --- a/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java +++ b/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java @@ -425,6 +425,17 @@ public void testInterruptedWithDeferredEvents() throws Exception { } } + /** + * Wait for the queue's event handler thread to enter cond.await(), ensuring + * that startIdleMs has been captured before the test advances MockTime. + */ + private static void waitForQueueThreadToBeIdle(Thread queueThread) throws InterruptedException { + TestUtils.waitForCondition( + () -> queueThread.getState() == Thread.State.WAITING, + "Queue thread should be waiting" + ); + } + @Test public void testIdleTimeCallback() throws Exception { MockTime time = new MockTime(); @@ -440,6 +451,11 @@ public void testIdleTimeCallback() throws Exception { lastIdleTimeMs.set(idleDuration); lastCurrentTimeMs.set(currentTime); })) { + // Capture the queue's event handler thread so we can wait for it to be idle. + CompletableFuture queueThreadFuture = new CompletableFuture<>(); + queue.append(() -> queueThreadFuture.complete(Thread.currentThread())); + Thread queueThread = queueThreadFuture.get(); + time.sleep(2); assertEquals(0, lastIdleTimeMs.get(), "Last idle time should be 0ms"); @@ -451,6 +467,8 @@ public void testIdleTimeCallback() throws Exception { })); assertEquals("event1-processed", event1.get()); + waitForQueueThreadToBeIdle(queueThread); + long timeBeforeWait = time.milliseconds(); long waitTime5Ms = 5; time.sleep(waitTime5Ms); @@ -464,6 +482,8 @@ public void testIdleTimeCallback() throws Exception { assertEquals(timeBeforeWait + waitTime5Ms, lastCurrentTimeMs.get(), "Current time should be " + (timeBeforeWait + waitTime5Ms) + "ms, was: " + lastCurrentTimeMs.get()); // Test 2: Deferred event + waitForQueueThreadToBeIdle(queueThread); + long timeBeforeDeferred = time.milliseconds(); long waitTime2Ms = 2; CompletableFuture deferredEvent2 = new CompletableFuture<>();