-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Changes cleanup thread to forceExecute. #38073
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 3 commits
46f8236
19b704f
f823ffa
19e3225
a47758a
ba3533e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -182,6 +182,7 @@ public final class StreamingDataflowWorker { | |
| private final ComputationConfig.Fetcher configFetcher; | ||
| private final ComputationStateCache computationStateCache; | ||
| private final BoundedQueueExecutor workUnitExecutor; | ||
| private final ScheduledExecutorService commitFinalizerCleanupExecutor; | ||
| private final AtomicReference<StreamingWorkerHarness> streamingWorkerHarness = | ||
| new AtomicReference<>(); | ||
| private final AtomicBoolean running = new AtomicBoolean(); | ||
|
|
@@ -208,6 +209,7 @@ private StreamingDataflowWorker( | |
| ComputationStateCache computationStateCache, | ||
| WindmillStateCache windmillStateCache, | ||
| BoundedQueueExecutor workUnitExecutor, | ||
| ScheduledExecutorService commitFinalizerCleanupExecutor, | ||
| DataflowMapTaskExecutorFactory mapTaskExecutorFactory, | ||
| DataflowWorkerHarnessOptions options, | ||
| HotKeyLogger hotKeyLogger, | ||
|
|
@@ -232,6 +234,7 @@ private StreamingDataflowWorker( | |
| Executors.newCachedThreadPool()); | ||
| this.options = options; | ||
| this.workUnitExecutor = workUnitExecutor; | ||
| this.commitFinalizerCleanupExecutor = commitFinalizerCleanupExecutor; | ||
| this.harnessSwitchExecutor = | ||
| Executors.newSingleThreadExecutor( | ||
| new ThreadFactoryBuilder().setNameFormat("HarnessSwitchExecutor").build()); | ||
|
|
@@ -252,6 +255,7 @@ private StreamingDataflowWorker( | |
| readerCache, | ||
| mapTaskExecutorFactory, | ||
| workUnitExecutor, | ||
| commitFinalizerCleanupExecutor, | ||
| this.stateCache::forComputation, | ||
| failureTracker, | ||
| workFailureProcessor, | ||
|
|
@@ -616,6 +620,13 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o | |
| StreamingCounters streamingCounters = StreamingCounters.create(); | ||
| WorkUnitClient dataflowServiceClient = new DataflowWorkUnitClient(options, LOG); | ||
| BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options); | ||
| ScheduledExecutorService commitFinalizerCleanupExecutor = | ||
| Executors.newScheduledThreadPool( | ||
| 10, | ||
| new ThreadFactoryBuilder() | ||
| .setNameFormat("FinalizationCallbackCleanup-%d") | ||
| .setDaemon(true) | ||
| .build()); | ||
| WindmillStateCache windmillStateCache = | ||
| WindmillStateCache.builder() | ||
| .setSizeMb(options.getWorkerCacheMb()) | ||
|
|
@@ -680,6 +691,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o | |
| computationStateCache, | ||
| windmillStateCache, | ||
| workExecutor, | ||
| commitFinalizerCleanupExecutor, | ||
| IntrinsicMapTaskExecutorFactory.defaultFactory(), | ||
| options, | ||
| new HotKeyLogger(), | ||
|
|
@@ -842,6 +854,13 @@ static StreamingDataflowWorker forTesting( | |
| WindmillStubFactoryFactory stubFactory) { | ||
| ConcurrentMap<String, StageInfo> stageInfo = new ConcurrentHashMap<>(); | ||
| BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options); | ||
| ScheduledExecutorService commitFinalizerCleanupExecutor = | ||
| Executors.newScheduledThreadPool( | ||
| 10, | ||
|
||
| new ThreadFactoryBuilder() | ||
| .setNameFormat("FinalizationCallbackCleanup-%d") | ||
| .setDaemon(true) | ||
| .build()); | ||
| WindmillStateCache stateCache = | ||
| WindmillStateCache.builder() | ||
| .setSizeMb(options.getWorkerCacheMb()) | ||
|
|
@@ -930,6 +949,7 @@ static StreamingDataflowWorker forTesting( | |
| computationStateCache, | ||
| stateCache, | ||
| workExecutor, | ||
| commitFinalizerCleanupExecutor, | ||
| mapTaskExecutorFactory, | ||
| options, | ||
| hotKeyLogger, | ||
|
|
@@ -1121,6 +1141,7 @@ void stop() { | |
| streamingWorkerHarness.get().shutdown(); | ||
| memoryMonitor.shutdown(); | ||
| workUnitExecutor.shutdown(); | ||
| commitFinalizerCleanupExecutor.shutdown(); | ||
| computationStateCache.closeAndInvalidateAll(); | ||
| workerStatusReporter.stop(); | ||
| } catch (Exception e) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,17 +17,14 @@ | |
| */ | ||
| package org.apache.beam.runners.dataflow.worker.windmill.work.processing; | ||
|
|
||
| import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; | ||
|
|
||
| import com.google.auto.value.AutoValue; | ||
| import java.util.ArrayList; | ||
| import java.util.Comparator; | ||
| import java.util.HashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.PriorityQueue; | ||
| import java.util.concurrent.ScheduledExecutorService; | ||
| import java.util.concurrent.ScheduledFuture; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.locks.Condition; | ||
| import java.util.concurrent.locks.ReentrantLock; | ||
| import javax.annotation.Nullable; | ||
| import javax.annotation.concurrent.GuardedBy; | ||
|
|
@@ -56,95 +53,79 @@ public abstract static class FinalizationInfo { | |
|
|
||
| public abstract Runnable getCallback(); | ||
|
|
||
| public static FinalizationInfo create(Long id, Instant expiryTime, Runnable callback) { | ||
| return new AutoValue_StreamingCommitFinalizer_FinalizationInfo(id, expiryTime, callback); | ||
| public abstract ScheduledFuture<?> getCleanupFuture(); | ||
|
|
||
| public static FinalizationInfo create( | ||
| Long id, Instant expiryTime, Runnable callback, ScheduledFuture<?> cleanupFuture) { | ||
| return new AutoValue_StreamingCommitFinalizer_FinalizationInfo( | ||
| id, expiryTime, callback, cleanupFuture); | ||
| } | ||
| } | ||
|
|
||
| private final ReentrantLock lock = new ReentrantLock(); | ||
| private final Condition queueMinChanged = lock.newCondition(); | ||
|
|
||
| @GuardedBy("lock") | ||
| private final HashMap<Long, FinalizationInfo> commitFinalizationCallbacks = new HashMap<>(); | ||
|
|
||
| @GuardedBy("lock") | ||
| private final PriorityQueue<FinalizationInfo> cleanUpQueue = | ||
| new PriorityQueue<>(11, Comparator.comparing(FinalizationInfo::getExpiryTime)); | ||
|
|
||
| @GuardedBy("lock") | ||
| private boolean cleanUpThreadStarted = false; | ||
|
|
||
| private final BoundedQueueExecutor finalizationExecutor; | ||
|
|
||
| private StreamingCommitFinalizer(BoundedQueueExecutor finalizationCleanupExecutor) { | ||
| finalizationExecutor = finalizationCleanupExecutor; | ||
| } | ||
|
|
||
| private void cleanupThreadBody() { | ||
| lock.lock(); | ||
| try { | ||
| while (true) { | ||
| final @Nullable FinalizationInfo minValue = cleanUpQueue.peek(); | ||
| if (minValue == null) { | ||
| // Wait for an element to be added and loop to re-examine the min. | ||
| queueMinChanged.await(); | ||
| continue; | ||
| } | ||
| // The cleanup threads run in their own Executor, so they don't block processing. | ||
| private final ScheduledExecutorService cleanupExecutor; | ||
|
|
||
| Instant now = Instant.now(); | ||
| Duration timeDifference = new Duration(now, minValue.getExpiryTime()); | ||
| if (timeDifference.getMillis() < 0 | ||
| || (queueMinChanged.await(timeDifference.getMillis(), TimeUnit.MILLISECONDS) | ||
| && cleanUpQueue.peek() == minValue)) { | ||
| // The minimum element has an expiry time before now, either because it had elapsed when | ||
| // we pulled it or because we awaited it, and it is still the minimum. | ||
| checkState(minValue == cleanUpQueue.poll()); | ||
| checkState(commitFinalizationCallbacks.remove(minValue.getId()) == minValue); | ||
| } | ||
| } | ||
| } catch (InterruptedException e) { | ||
| // We're being shutdown. | ||
| } finally { | ||
| lock.unlock(); | ||
| } | ||
| private StreamingCommitFinalizer( | ||
| BoundedQueueExecutor finalizationExecutor, ScheduledExecutorService cleanupExecutor) { | ||
| this.finalizationExecutor = finalizationExecutor; | ||
| this.cleanupExecutor = cleanupExecutor; | ||
| } | ||
|
|
||
| static StreamingCommitFinalizer create(BoundedQueueExecutor workExecutor) { | ||
| return new StreamingCommitFinalizer(workExecutor); | ||
| static StreamingCommitFinalizer create( | ||
| BoundedQueueExecutor workExecutor, ScheduledExecutorService cleanupExecutor) { | ||
| return new StreamingCommitFinalizer(workExecutor, cleanupExecutor); | ||
| } | ||
|
|
||
| /** | ||
| * Stores a map of user worker generated finalization ids and callbacks to execute once a commit | ||
| * has been successfully committed to the backing state store. | ||
| */ | ||
| public void cacheCommitFinalizers(Map<Long, Pair<Instant, Runnable>> callbacks) { | ||
| List<FinalizationInfo> finalizeInfos = new ArrayList<>(); | ||
| Instant now = Instant.now(); | ||
| for (Map.Entry<Long, Pair<Instant, Runnable>> entry : callbacks.entrySet()) { | ||
| Long finalizeId = entry.getKey(); | ||
| final FinalizationInfo info = | ||
| FinalizationInfo.create( | ||
| finalizeId, entry.getValue().getLeft(), entry.getValue().getRight()); | ||
|
|
||
| Instant cleanupTime = entry.getValue().getLeft(); | ||
| // Ignore finalizers that have already expired. | ||
| if (cleanupTime.isAfter(now)) { | ||
| ScheduledFuture<?> cleanupFuture = | ||
| cleanupExecutor.schedule( | ||
| () -> { | ||
| lock.lock(); | ||
| try { | ||
| commitFinalizationCallbacks.remove(entry.getKey()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a race here and this can execute before We can move cleanupExecutor.schedule under |
||
| } finally { | ||
| lock.unlock(); | ||
| } | ||
| }, | ||
| new Duration(now, cleanupTime).getMillis(), | ||
| TimeUnit.MILLISECONDS); | ||
| finalizeInfos.add( | ||
| FinalizationInfo.create( | ||
| entry.getKey(), | ||
| entry.getValue().getLeft(), | ||
| entry.getValue().getRight(), | ||
| cleanupFuture)); | ||
| } | ||
| } | ||
| if (!finalizeInfos.isEmpty()) { | ||
| lock.lock(); | ||
| try { | ||
| FinalizationInfo existingInfo = commitFinalizationCallbacks.put(finalizeId, info); | ||
| if (existingInfo != null) { | ||
| throw new IllegalStateException( | ||
| "Expected to not have any past callbacks for bundle " | ||
| + finalizeId | ||
| + " but had " | ||
| + existingInfo); | ||
| } | ||
| if (!cleanUpThreadStarted) { | ||
| // Start the cleanup thread lazily for pipelines that don't use finalization callbacks | ||
| // and some tests. | ||
| cleanUpThreadStarted = true; | ||
| finalizationExecutor.execute(this::cleanupThreadBody, 0); | ||
| } | ||
| cleanUpQueue.add(info); | ||
| @SuppressWarnings("ReferenceEquality") | ||
| boolean newMin = cleanUpQueue.peek() == info; | ||
| if (newMin) { | ||
| queueMinChanged.signal(); | ||
| for (FinalizationInfo info : finalizeInfos) { | ||
| FinalizationInfo existingInfo = commitFinalizationCallbacks.put(info.getId(), info); | ||
| if (existingInfo != null) { | ||
| throw new IllegalStateException( | ||
| "Expected to not have any past callbacks for bundle " | ||
| + info.getId() | ||
| + " but had " | ||
| + existingInfo); | ||
| } | ||
| } | ||
| } finally { | ||
| lock.unlock(); | ||
|
|
@@ -167,8 +148,8 @@ public void finalizeCommits(Iterable<Long> finalizeIds) { | |
| for (long finalizeId : finalizeIds) { | ||
| @Nullable FinalizationInfo info = commitFinalizationCallbacks.remove(finalizeId); | ||
| if (info != null) { | ||
| checkState(cleanUpQueue.remove(info)); | ||
| callbacksToExecute.add(info.getCallback()); | ||
| info.getCleanupFuture().cancel(true); | ||
| } | ||
| } | ||
| } finally { | ||
|
|
@@ -186,10 +167,10 @@ public void finalizeCommits(Iterable<Long> finalizeIds) { | |
| } | ||
|
|
||
| @VisibleForTesting | ||
| int cleanupQueueSize() { | ||
| int pendingCallbacksSize() { | ||
| lock.lock(); | ||
| try { | ||
| return cleanUpQueue.size(); | ||
| return commitFinalizationCallbacks.size(); | ||
| } finally { | ||
| lock.unlock(); | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since the callbacks are quick, 1 thread here should be enough?