diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 1dcedf4370d7..f5e5adab1556 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -182,6 +182,7 @@ public final class StreamingDataflowWorker { private final ComputationConfig.Fetcher configFetcher; private final ComputationStateCache computationStateCache; private final BoundedQueueExecutor workUnitExecutor; + private final ScheduledExecutorService commitFinalizerCleanupExecutor; private final AtomicReference streamingWorkerHarness = new AtomicReference<>(); private final AtomicBoolean running = new AtomicBoolean(); @@ -208,6 +209,7 @@ private StreamingDataflowWorker( ComputationStateCache computationStateCache, WindmillStateCache windmillStateCache, BoundedQueueExecutor workUnitExecutor, + ScheduledExecutorService commitFinalizerCleanupExecutor, DataflowMapTaskExecutorFactory mapTaskExecutorFactory, DataflowWorkerHarnessOptions options, HotKeyLogger hotKeyLogger, @@ -232,6 +234,7 @@ private StreamingDataflowWorker( Executors.newCachedThreadPool()); this.options = options; this.workUnitExecutor = workUnitExecutor; + this.commitFinalizerCleanupExecutor = commitFinalizerCleanupExecutor; this.harnessSwitchExecutor = Executors.newSingleThreadExecutor( new ThreadFactoryBuilder().setNameFormat("HarnessSwitchExecutor").build()); @@ -252,6 +255,7 @@ private StreamingDataflowWorker( readerCache, mapTaskExecutorFactory, workUnitExecutor, + commitFinalizerCleanupExecutor, this.stateCache::forComputation, failureTracker, workFailureProcessor, @@ -618,6 +622,13 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o StreamingCounters streamingCounters = StreamingCounters.create(); WorkUnitClient dataflowServiceClient = new DataflowWorkUnitClient(options, LOG); BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options); + ScheduledExecutorService commitFinalizerCleanupExecutor = + Executors.newScheduledThreadPool( + 1, + new ThreadFactoryBuilder() + .setNameFormat("FinalizationCallbackCleanup-%d") + .setDaemon(true) + .build()); WindmillStateCache windmillStateCache = WindmillStateCache.builder() .setSizeMb(options.getWorkerCacheMb()) @@ -682,6 +693,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o computationStateCache, windmillStateCache, workExecutor, + commitFinalizerCleanupExecutor, IntrinsicMapTaskExecutorFactory.defaultFactory(), options, new HotKeyLogger(), @@ -844,6 +856,13 @@ static StreamingDataflowWorker forTesting( WindmillStubFactoryFactory stubFactory) { ConcurrentMap stageInfo = new ConcurrentHashMap<>(); BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options); + ScheduledExecutorService commitFinalizerCleanupExecutor = + Executors.newScheduledThreadPool( + 1, + new ThreadFactoryBuilder() + .setNameFormat("FinalizationCallbackCleanup-%d") + .setDaemon(true) + .build()); WindmillStateCache stateCache = WindmillStateCache.builder() .setSizeMb(options.getWorkerCacheMb()) @@ -932,6 +951,7 @@ static StreamingDataflowWorker forTesting( computationStateCache, stateCache, workExecutor, + commitFinalizerCleanupExecutor, mapTaskExecutorFactory, options, hotKeyLogger, @@ -1123,6 +1143,7 @@ void stop() { streamingWorkerHarness.get().shutdown(); memoryMonitor.shutdown(); workUnitExecutor.shutdown(); + commitFinalizerCleanupExecutor.shutdown(); computationStateCache.closeAndInvalidateAll(); workerStatusReporter.stop(); } catch (Exception e) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java index 4266f11f50c9..5a66545ab335 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java @@ -17,17 +17,14 @@ */ package org.apache.beam.runners.dataflow.worker.windmill.work.processing; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; - import com.google.auto.value.AutoValue; import java.util.ArrayList; -import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.PriorityQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; @@ -56,61 +53,34 @@ public abstract static class FinalizationInfo { public abstract Runnable getCallback(); - public static FinalizationInfo create(Long id, Instant expiryTime, Runnable callback) { - return new AutoValue_StreamingCommitFinalizer_FinalizationInfo(id, expiryTime, callback); + public abstract ScheduledFuture getCleanupFuture(); + + public static FinalizationInfo create( + Long id, Instant expiryTime, Runnable callback, ScheduledFuture cleanupFuture) { + return new AutoValue_StreamingCommitFinalizer_FinalizationInfo( + id, expiryTime, callback, cleanupFuture); } } private final ReentrantLock lock = new ReentrantLock(); - private final Condition queueMinChanged = lock.newCondition(); @GuardedBy("lock") private final HashMap commitFinalizationCallbacks = new HashMap<>(); - @GuardedBy("lock") - private final PriorityQueue cleanUpQueue = - new PriorityQueue<>(11, Comparator.comparing(FinalizationInfo::getExpiryTime)); - - @GuardedBy("lock") - private boolean cleanUpThreadStarted = false; - private final BoundedQueueExecutor finalizationExecutor; - private StreamingCommitFinalizer(BoundedQueueExecutor finalizationCleanupExecutor) { - finalizationExecutor = finalizationCleanupExecutor; - } + // The cleanup threads run in their own Executor, so they don't block processing. + private final ScheduledExecutorService cleanupExecutor; - private void cleanupThreadBody() { - lock.lock(); - try { - while (true) { - final @Nullable FinalizationInfo minValue = cleanUpQueue.peek(); - if (minValue == null) { - // Wait for an element to be added and loop to re-examine the min. - queueMinChanged.await(); - continue; - } - - Instant now = Instant.now(); - Duration timeDifference = new Duration(now, minValue.getExpiryTime()); - if (timeDifference.getMillis() < 0 - || (queueMinChanged.await(timeDifference.getMillis(), TimeUnit.MILLISECONDS) - && cleanUpQueue.peek() == minValue)) { - // The minimum element has an expiry time before now, either because it had elapsed when - // we pulled it or because we awaited it, and it is still the minimum. - checkState(minValue == cleanUpQueue.poll()); - checkState(commitFinalizationCallbacks.remove(minValue.getId()) == minValue); - } - } - } catch (InterruptedException e) { - // We're being shutdown. - } finally { - lock.unlock(); - } + private StreamingCommitFinalizer( + BoundedQueueExecutor finalizationExecutor, ScheduledExecutorService cleanupExecutor) { + this.finalizationExecutor = finalizationExecutor; + this.cleanupExecutor = cleanupExecutor; } - static StreamingCommitFinalizer create(BoundedQueueExecutor workExecutor) { - return new StreamingCommitFinalizer(workExecutor); + static StreamingCommitFinalizer create( + BoundedQueueExecutor workExecutor, ScheduledExecutorService cleanupExecutor) { + return new StreamingCommitFinalizer(workExecutor, cleanupExecutor); } /** @@ -118,37 +88,47 @@ static StreamingCommitFinalizer create(BoundedQueueExecutor workExecutor) { * has been successfully committed to the backing state store. */ public void cacheCommitFinalizers(Map> callbacks) { - for (Map.Entry> entry : callbacks.entrySet()) { - Long finalizeId = entry.getKey(); - final FinalizationInfo info = - FinalizationInfo.create( - finalizeId, entry.getValue().getLeft(), entry.getValue().getRight()); - - lock.lock(); - try { - FinalizationInfo existingInfo = commitFinalizationCallbacks.put(finalizeId, info); + if (callbacks.isEmpty()) { + return; + } + Instant now = Instant.now(); + lock.lock(); + try { + for (Map.Entry> entry : callbacks.entrySet()) { + Instant cleanupTime = entry.getValue().getLeft(); + // Ignore finalizers that have already expired. + if (cleanupTime.isBefore(now)) { + continue; + } + ScheduledFuture cleanupFuture = + cleanupExecutor.schedule( + () -> { + lock.lock(); + try { + commitFinalizationCallbacks.remove(entry.getKey()); + } finally { + lock.unlock(); + } + }, + new Duration(now, cleanupTime).getMillis(), + TimeUnit.MILLISECONDS); + FinalizationInfo info = + FinalizationInfo.create( + entry.getKey(), + entry.getValue().getLeft(), + entry.getValue().getRight(), + cleanupFuture); + FinalizationInfo existingInfo = commitFinalizationCallbacks.put(info.getId(), info); if (existingInfo != null) { throw new IllegalStateException( "Expected to not have any past callbacks for bundle " - + finalizeId + + info.getId() + " but had " + existingInfo); } - if (!cleanUpThreadStarted) { - // Start the cleanup thread lazily for pipelines that don't use finalization callbacks - // and some tests. - cleanUpThreadStarted = true; - finalizationExecutor.execute(this::cleanupThreadBody, 0); - } - cleanUpQueue.add(info); - @SuppressWarnings("ReferenceEquality") - boolean newMin = cleanUpQueue.peek() == info; - if (newMin) { - queueMinChanged.signal(); - } - } finally { - lock.unlock(); } + } finally { + lock.unlock(); } } @@ -167,8 +147,8 @@ public void finalizeCommits(Iterable finalizeIds) { for (long finalizeId : finalizeIds) { @Nullable FinalizationInfo info = commitFinalizationCallbacks.remove(finalizeId); if (info != null) { - checkState(cleanUpQueue.remove(info)); callbacksToExecute.add(info.getCallback()); + info.getCleanupFuture().cancel(true); } } } finally { @@ -186,10 +166,10 @@ public void finalizeCommits(Iterable finalizeIds) { } @VisibleForTesting - int cleanupQueueSize() { + int pendingCallbacksSize() { lock.lock(); try { - return cleanUpQueue.size(); + return commitFinalizationCallbacks.size(); } finally { lock.unlock(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java index 4dd8ee3d0c27..1428037d9ca0 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java @@ -23,6 +23,7 @@ import com.google.auto.value.AutoValue; import java.util.Optional; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Supplier; @@ -121,6 +122,7 @@ public static StreamingWorkScheduler create( ReaderCache readerCache, DataflowMapTaskExecutorFactory mapTaskExecutorFactory, BoundedQueueExecutor workExecutor, + ScheduledExecutorService commitFinalizerCleanupExecutor, Function stateCacheFactory, FailureTracker failureTracker, WorkFailureProcessor workFailureProcessor, @@ -148,7 +150,7 @@ public static StreamingWorkScheduler create( SideInputStateFetcherFactory.fromOptions(options), failureTracker, workFailureProcessor, - StreamingCommitFinalizer.create(workExecutor), + StreamingCommitFinalizer.create(workExecutor, commitFinalizerCleanupExecutor), streamingCounters, hotKeyLogger, stageInfoMap, diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index e9df101793cb..5b63e408d790 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -668,6 +668,45 @@ private Windmill.GetWorkResponse makeInput( Collections.singletonList(DEFAULT_WINDOW))); } + private Windmill.GetWorkResponse makeInput( + int index, long timestamp, String key, long shardingKey, Long finalizeId) throws Exception { + return buildInput( + "work {" + + " computation_id: \"" + + DEFAULT_COMPUTATION_ID + + "\"" + + " input_data_watermark: 0" + + " work {" + + " key: \"" + + key + + "\"" + + " sharding_key: " + + shardingKey + + " work_token: " + + index + + " cache_token: " + + (index + 1) + + " message_bundles {" + + " source_computation_id: \"" + + DEFAULT_SOURCE_COMPUTATION_ID + + "\"" + + " messages {" + + " timestamp: " + + timestamp + + " data: \"data" + + index + + "\"" + + " }" + + " }" + + " }" + + "}" + + "applied_finalize_ids: " + + finalizeId, + CoderUtils.encodeToByteArray( + CollectionCoder.of(IntervalWindow.getCoder()), + Collections.singletonList(DEFAULT_WINDOW))); + } + private Windmill.GetWorkResponse makeInput( int workToken, int cacheToken, long timestamp, String key, long shardingKey) throws Exception { @@ -4277,6 +4316,39 @@ public void testSwitchStreamingWorkerHarness() throws Exception { worker.stop(); } + @Test + public void testBundleFinalizersAreCalled() throws Exception { + List instructions = + Arrays.asList( + makeSourceInstruction(StringUtf8Coder.of()), + makeDoFnInstruction(new BundleFinalizerFn(), 0, StringUtf8Coder.of()), + makeSinkInstruction(StringUtf8Coder.of(), 0)); + + StreamingDataflowWorker worker = + makeWorker( + defaultWorkerParams("--activeWorkRefreshPeriodMillis=10") + .setInstructions(instructions) + .build()); + worker.start(); + + server.whenGetWorkCalled().thenReturn(makeInput(0, 0L, "key", DEFAULT_SHARDING_KEY)); + + Map result = server.waitForAndGetCommits(1); + List finalizeIds = new ArrayList<>(); + for (Windmill.WorkItemCommitRequest commit : result.values()) { + finalizeIds.addAll(commit.getFinalizeIdsList()); + } + assertEquals(1, finalizeIds.size()); + server + .whenGetWorkCalled() + .thenReturn(makeInput(1, 0L, "key", DEFAULT_SHARDING_KEY, finalizeIds.get(0))); + server.waitForAndGetCommits(1); + assertThat( + "At least one commit finalizer called", BundleFinalizerFn.bundleFinalizerCount.get() > 0); + + worker.stop(); + } + private void runNumCommitThreadsTest(int configNumCommitThreads, int expectedNumCommitThreads) { List instructions = Arrays.asList( @@ -4621,6 +4693,18 @@ public void processElement(ProcessContext c) throws Exception { } } + private static class BundleFinalizerFn extends DoFn { + public static AtomicInteger bundleFinalizerCount = new AtomicInteger(); + + @ProcessElement + public void processElement(ProcessContext c, BundleFinalizer bundleFinalizer) { + c.output(c.element()); + bundleFinalizer.afterBundleCommit( + Instant.now().plus(Duration.standardMinutes(5)), + () -> bundleFinalizerCount.incrementAndGet()); + } + } + static class FakeClock implements Supplier { private final PriorityQueue jobs = new PriorityQueue<>(); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java index 7361d0be2cd0..07b4b14fd115 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java @@ -29,6 +29,8 @@ import java.util.Collections; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; @@ -45,6 +47,7 @@ public class StreamingCommitFinalizerTest { private StreamingCommitFinalizer finalizer; private BoundedQueueExecutor executor; + private ScheduledExecutorService cleanupExecutor; @Before public void setUp() { @@ -60,12 +63,20 @@ public void setUp() { .setDaemon(true) .build(), /*useFairMonitor=*/ false); - finalizer = StreamingCommitFinalizer.create(executor); + + cleanupExecutor = + Executors.newScheduledThreadPool( + 1, + new ThreadFactoryBuilder() + .setNameFormat("FinalizationCallbackCleanup-%d") + .setDaemon(true) + .build()); + finalizer = StreamingCommitFinalizer.create(executor, cleanupExecutor); } @Test public void testCreateAndInit() { - assertEquals(0, finalizer.cleanupQueueSize()); + assertEquals(0, finalizer.pendingCallbacksSize()); } @Test @@ -73,7 +84,7 @@ public void testCacheCommitFinalizer() { Runnable callback = mock(Runnable.class); finalizer.cacheCommitFinalizers( ImmutableMap.of(1L, Pair.of(Instant.now().plus(Duration.standardHours(1)), callback))); - assertEquals(1, finalizer.cleanupQueueSize()); + assertEquals(1, finalizer.pendingCallbacksSize()); verify(callback, never()).run(); } @@ -101,7 +112,7 @@ public void testFinalizeCommits() throws Exception { () -> callbackExecuted.countDown()))); finalizer.finalizeCommits(Collections.singletonList(1L)); assertTrue(callbackExecuted.await(30, TimeUnit.SECONDS)); - assertEquals(0, finalizer.cleanupQueueSize()); + assertEquals(0, finalizer.pendingCallbacksSize()); } @Test @@ -127,7 +138,7 @@ public void testMultipleCommits() throws Exception { finalizer.finalizeCommits(Collections.singletonList(1L)); assertTrue(callback1Executed.await(30, TimeUnit.SECONDS)); - assertEquals(0, finalizer.cleanupQueueSize()); + assertEquals(0, finalizer.pendingCallbacksSize()); } @Test @@ -135,10 +146,10 @@ public void testIgnoresUnknownIds() throws Exception { Runnable callback = mock(Runnable.class); finalizer.cacheCommitFinalizers( ImmutableMap.of(1L, Pair.of(Instant.now().plus(Duration.standardHours(1)), callback))); + assertEquals(1, finalizer.pendingCallbacksSize()); finalizer.finalizeCommits(Collections.singletonList(2L)); - assertEquals(1, executor.elementsOutstanding()); verify(callback, never()).run(); - assertEquals(1, finalizer.cleanupQueueSize()); + assertEquals(1, finalizer.pendingCallbacksSize()); } @Test @@ -150,7 +161,7 @@ public void testCleanupOnExpiration() throws Exception { Pair.of( Instant.now().plus(Duration.standardHours(1)), () -> callback1Executed.countDown()))); - assertEquals(1, finalizer.cleanupQueueSize()); + assertEquals(1, finalizer.pendingCallbacksSize()); Runnable callback2 = mock(Runnable.class); Runnable callback3 = mock(Runnable.class); @@ -161,17 +172,17 @@ public void testCleanupOnExpiration() throws Exception { .put(3L, Pair.of(shortTimeout, callback3)) .build()); - while (finalizer.cleanupQueueSize() > 1) { + while (finalizer.pendingCallbacksSize() > 1) { // Wait until the two 100ms timeouts expire. Thread.sleep(200); } - assertEquals(1, executor.elementsOutstanding()); + assertEquals(1, finalizer.pendingCallbacksSize()); finalizer.finalizeCommits(ImmutableList.of(2L, 3L)); verify(callback2, never()).run(); verify(callback3, never()).run(); finalizer.finalizeCommits(Collections.singletonList(1L)); assertTrue(callback1Executed.await(30, TimeUnit.SECONDS)); - assertEquals(0, finalizer.cleanupQueueSize()); + assertEquals(0, finalizer.pendingCallbacksSize()); } }