diff --git a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala index 633979613d85..325b2e01cd0b 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala @@ -97,6 +97,15 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) { def hashProbeDynamicFilterPushdownEnabled: Boolean = getConf(HASH_PROBE_DYNAMIC_FILTER_PUSHDOWN_ENABLED) + def parallelExecutionEnabled: Boolean = + getConf(PARALLEL_EXECUTION_ENABLED) + + def parallelExecutionThreadPoolSize: Option[Int] = + getConf(PARALLEL_EXECUTION_THREAD_POOL_SIZE) + + def parallelExecutionMaxDrivers: Int = + getConf(PARALLEL_EXECUTION_MAX_DRIVERS) + def valueStreamDynamicFilterEnabled: Boolean = getConf(VALUE_STREAM_DYNAMIC_FILTER_ENABLED) } @@ -470,6 +479,32 @@ object VeloxConfig extends ConfigRegistry { .booleanConf .createWithDefault(true) + val PARALLEL_EXECUTION_ENABLED = + buildStaticConf("spark.gluten.sql.columnar.backend.velox.parallelExecution.enabled") + .doc( + "Whether to enable parallel execution of Velox task drivers for whole-stage execution. " + + "Default is false (serial execution).") + .booleanConf + .createWithDefault(false) + + val PARALLEL_EXECUTION_THREAD_POOL_SIZE = + buildStaticConf("spark.gluten.sql.columnar.backend.velox.parallelExecution.threadPoolSize") + .doc( + "Size of the thread pool used for parallel execution of Velox task drivers. " + + "If not set, defaults to 2 * spark.gluten.numTaskSlotsPerExecutor.") + .intConf + .checkValue(_ > 0, "must be a positive number") + .createOptional + + val PARALLEL_EXECUTION_MAX_DRIVERS = + buildConf("spark.gluten.sql.columnar.backend.velox.parallelExecution.maxDrivers") + .doc( + "Maximum number of parallel Velox task drivers to use for whole-stage execution. " + + "Default is 4.") + .intConf + .checkValue(_ > 0, "must be a positive number") + .createWithDefault(4) + val VALUE_STREAM_DYNAMIC_FILTER_ENABLED = buildConf("spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled") .doc( diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala index 09430fdd70ec..1e73ef9fc4fc 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala @@ -267,7 +267,7 @@ object MetricsUtil extends Logging { metrics.getSingleMetrics, joinParamsMap.get(operatorIdx)) case u: UnionMetricsUpdater => - // JoinRel outputs two suites of metrics respectively for hash build and hash probe. + // Union outputs two suites of metrics respectively. // Therefore, fetch one more suite of metrics here. operatorMetrics.add(metrics.getOperatorMetrics(curMetricsIdx)) curMetricsIdx -= 1 @@ -364,7 +364,7 @@ object MetricsUtil extends Logging { } } catch { case e: Exception => - logWarning(s"Updating native metrics failed due to ${e.getCause}.") + logWarning(s"Updating native metrics failed: ${e.getMessage}", e) () } } diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala index 12807448c7e0..b23728795422 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala @@ -373,7 +373,7 @@ class VeloxTPCHV2BhjSuite extends VeloxTPCHSuite { } } -class VeloxPartitionedTableTPCHSuite extends VeloxTPCHSuite { +class VeloxTPCHPartitionedTableSuite extends VeloxTPCHSuite { override def subType(): String = "partitioned" override protected def sparkConf: SparkConf = { @@ -427,3 +427,26 @@ class VeloxTPCHV1VanillaBhjGlutenBeSuite extends VeloxTPCHSuite { .set(GlutenConfig.COLUMNAR_BROADCAST_EXCHANGE_ENABLED.key, "true") } } + +class VeloxTPCHV1ParallelExecutionSuite extends VeloxTPCHSuite { + override def subType(): String = "v1-parallel" + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.sql.sources.useV1SourceList", "parquet") + .set("spark.sql.autoBroadcastJoinThreshold", "-1") + .set(VeloxConfig.PARALLEL_EXECUTION_ENABLED.key, "true") + } +} + +class VeloxTPCHV1ParallelExecutionBhjSuite extends VeloxTPCHSuite { + override def subType(): String = "v1-parallel-bhj" + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.sql.sources.useV1SourceList", "parquet") + .set("spark.sql.autoBroadcastJoinThreshold", "-1") + .set(VeloxConfig.PARALLEL_EXECUTION_ENABLED.key, "true") + .set("spark.sql.autoBroadcastJoinThreshold", "30M") + } +} diff --git a/cpp/core/CMakeLists.txt b/cpp/core/CMakeLists.txt index 58dd301b6968..d95fc50e6843 100644 --- a/cpp/core/CMakeLists.txt +++ b/cpp/core/CMakeLists.txt @@ -128,6 +128,8 @@ set(SPARK_COLUMNAR_PLUGIN_SRCS memory/MemoryManager.cc memory/ArrowMemoryPool.cc memory/ColumnarBatch.cc + threads/ThreadInitializer.cc + threads/ThreadManager.cc shuffle/Dictionary.cc shuffle/FallbackRangePartitioner.cc shuffle/HashPartitioner.cc diff --git a/cpp/core/compute/Runtime.cc b/cpp/core/compute/Runtime.cc index f5b4ed0f334b..36502d206cad 100644 --- a/cpp/core/compute/Runtime.cc +++ b/cpp/core/compute/Runtime.cc @@ -40,9 +40,10 @@ void Runtime::registerFactory(const std::string& kind, Runtime::Factory factory, Runtime* Runtime::create( const std::string& kind, MemoryManager* memoryManager, + ThreadManager* threadManager, const std::unordered_map& sessionConf) { auto& factory = runtimeFactories().get(kind); - return factory(kind, std::move(memoryManager), sessionConf); + return factory(kind, std::move(memoryManager), std::move(threadManager), sessionConf); } void Runtime::release(Runtime* runtime) { diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h index 9d8315731fc5..4acd9be74ce0 100644 --- a/cpp/core/compute/Runtime.h +++ b/cpp/core/compute/Runtime.h @@ -30,6 +30,7 @@ #include "shuffle/ShuffleReader.h" #include "shuffle/ShuffleWriter.h" #include "substrait/plan.pb.h" +#include "threads/ThreadManager.h" #include "utils/ObjectStore.h" #include "utils/WholeStageDumper.h" @@ -61,12 +62,14 @@ class Runtime : public std::enable_shared_from_this { using Factory = std::function& sessionConf)>; using Releaser = std::function; static void registerFactory(const std::string& kind, Factory factory, Releaser releaser); static Runtime* create( const std::string& kind, MemoryManager* memoryManager, + ThreadManager* threadManager, const std::unordered_map& sessionConf = {}); static void release(Runtime*); static std::optional* localWriteFilesTempPath(); @@ -75,8 +78,9 @@ class Runtime : public std::enable_shared_from_this { Runtime( const std::string& kind, MemoryManager* memoryManager, + ThreadManager* threadManager, const std::unordered_map& confMap) - : kind_(kind), memoryManager_(memoryManager), confMap_(confMap) {} + : kind_(kind), memoryManager_(memoryManager), threadManager_(threadManager), confMap_(confMap) {} virtual ~Runtime() = default; @@ -126,6 +130,10 @@ class Runtime : public std::enable_shared_from_this { return memoryManager_; }; + virtual ThreadManager* threadManager() { + return threadManager_; + }; + /// This function is used to create certain converter from the format used by /// the backend to Spark unsafe row. virtual std::shared_ptr createColumnar2RowConverter(int64_t column2RowMemThreshold) { @@ -184,6 +192,7 @@ class Runtime : public std::enable_shared_from_this { protected: std::string kind_; MemoryManager* memoryManager_; + ThreadManager* threadManager_; std::unique_ptr objStore_ = ObjectStore::create(); std::unordered_map confMap_; // Session conf map diff --git a/cpp/core/jni/JniCommon.cc b/cpp/core/jni/JniCommon.cc index 9d435bb59239..96a12913acd4 100644 --- a/cpp/core/jni/JniCommon.cc +++ b/cpp/core/jni/JniCommon.cc @@ -16,6 +16,7 @@ */ #include "JniCommon.h" +#include void gluten::JniCommonState::ensureInitialized(JNIEnv* env) { std::lock_guard lockGuard(mtx_); @@ -95,7 +96,6 @@ gluten::JniColumnarBatchIterator::~JniColumnarBatchIterator() { attachCurrentThreadAsDaemonOrThrow(vm_, &env); env->DeleteGlobalRef(jColumnarBatchItr_); env->DeleteGlobalRef(serializedColumnarBatchIteratorClass_); - vm_->DetachCurrentThread(); } std::shared_ptr gluten::JniColumnarBatchIterator::next() { @@ -116,7 +116,6 @@ std::shared_ptr gluten::JniColumnarBatchIterator::next() std::shared_ptr gluten::JniColumnarBatchIterator::nextInternal() const { JNIEnv* env = nullptr; attachCurrentThreadAsDaemonOrThrow(vm_, &env); - if (!env->CallBooleanMethod(jColumnarBatchItr_, serializedColumnarBatchIteratorHasNext_)) { checkException(env); return nullptr; // stream ended diff --git a/cpp/core/jni/JniCommon.h b/cpp/core/jni/JniCommon.h index 783f8edfcc5d..553d75c3ed0b 100644 --- a/cpp/core/jni/JniCommon.h +++ b/cpp/core/jni/JniCommon.h @@ -26,6 +26,7 @@ #include "compute/Runtime.h" #include "memory/AllocationListener.h" #include "shuffle/rss/RssClient.h" +#include "threads/ThreadInitializer.h" #include "utils/Compression.h" #include "utils/Exception.h" #include "utils/ResourceMap.h" @@ -463,7 +464,7 @@ class SparkAllocationListener final : public gluten::AllocationListener { }; class BacktraceAllocationListener final : public gluten::AllocationListener { - public: +public: BacktraceAllocationListener(std::unique_ptr delegator) : delegator_(std::move(delegator)) {} @@ -549,3 +550,73 @@ class JavaRssClient : public RssClient { jmethodID javaPushPartitionData_; jbyteArray array_; }; + +class SparkThreadInitializer final : public gluten::ThreadInitializer { + public: + SparkThreadInitializer(JavaVM* vm, jobject jInitializerLocalRef) : vm_(vm) { + JNIEnv* env; + attachCurrentThreadAsDaemonOrThrow(vm_, &env); + jInitializerGlobalRef_ = env->NewGlobalRef(jInitializerLocalRef); + GLUTEN_CHECK(jInitializerGlobalRef_ != nullptr, "Failed to create global reference for native thread initializer."); + (void)initializeMethod(env); + } + + SparkThreadInitializer(const SparkThreadInitializer&) = delete; + SparkThreadInitializer(SparkThreadInitializer&&) = delete; + SparkThreadInitializer& operator=(const SparkThreadInitializer&) = delete; + SparkThreadInitializer& operator=(SparkThreadInitializer&&) = delete; + + ~SparkThreadInitializer() override { + JNIEnv* env; + if (vm_->GetEnv(reinterpret_cast(&env), jniVersion) != JNI_OK) { + LOG(WARNING) << "SparkThreadInitializer#~SparkThreadInitializer(): " + << "JNIEnv was not attached to current thread"; + return; + } + env->DeleteGlobalRef(jInitializerGlobalRef_); + } + + void initialize(const std::string& threadName) override { + JNIEnv* env; + attachCurrentThreadAsDaemonOrThrow(vm_, &env); + jstring jThreadName = env->NewStringUTF(threadName.c_str()); + env->CallVoidMethod(jInitializerGlobalRef_, initializeMethod(env), jThreadName); + env->DeleteLocalRef(jThreadName); + checkException(env); + } + + void destroy(const std::string& threadName) override { + // IMPORTANT: Do not call vm_.DetachCurrentThread here, otherwise Java side thread + // object might be dereferenced and garbage-collected, to break the reuse of thread + // resources. + JNIEnv* env; + attachCurrentThreadAsDaemonOrThrow(vm_, &env); + jstring jThreadName = env->NewStringUTF(threadName.c_str()); + env->CallVoidMethod(jInitializerGlobalRef_, destroyMethod(env), jThreadName); + env->DeleteLocalRef(jThreadName); + checkException(env); + } + + private: + jmethodID initializeMethod(JNIEnv* env) { + static jmethodID initializeMethod = + getMethodIdOrError(env, nativeThreadInitializerClass(env), "initialize", "(Ljava/lang/String;)V"); + return initializeMethod; + } + + jmethodID destroyMethod(JNIEnv* env) { + static jmethodID destroyMethod = + getMethodIdOrError(env, nativeThreadInitializerClass(env), "destroy", "(Ljava/lang/String;)V"); + return destroyMethod; + } + + jclass nativeThreadInitializerClass(JNIEnv* env) { + static jclass javaInitializerClass = + createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/threads/NativeThreadInitializer;"); + return javaInitializerClass; + } + + private: + JavaVM* vm_; + jobject jInitializerGlobalRef_; +}; diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index c8cd3adef457..8ff1cce80c7a 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -103,7 +103,6 @@ class JavaInputStreamAdaptor final : public arrow::io::InputStream { env->CallVoidMethod(jniIn_, jniByteInputStreamClose); checkException(env); env->DeleteGlobalRef(jniIn_); - vm_->DetachCurrentThread(); closed_ = true; return arrow::Status::OK(); } @@ -177,8 +176,9 @@ class InternalRuntime : public Runtime { InternalRuntime( const std::string& kind, MemoryManager* memoryManager, + ThreadManager* threadManager, const std::unordered_map& confMap) - : Runtime(kind, memoryManager, confMap) {} + : Runtime(kind, memoryManager, threadManager, confMap) {} }; MemoryManager* internalMemoryManagerFactory(const std::string& kind, std::unique_ptr listener) { @@ -189,11 +189,33 @@ void internalMemoryManagerReleaser(MemoryManager* memoryManager) { delete memoryManager; } +class InternalThreadManager : public ThreadManager { + public: + InternalThreadManager(const std::string& kind, std::unique_ptr initializer) + : ThreadManager(kind), initializer_(std::shared_ptr(std::move(initializer))) {} + + ThreadInitializer* getThreadInitializer() override { + return initializer_.get(); + } + + private: + std::shared_ptr initializer_; +}; + +ThreadManager* internalThreadManagerFactory(const std::string& kind, std::unique_ptr initializer) { + return new InternalThreadManager(kind, std::move(initializer)); +} + +void internalThreadManagerReleaser(ThreadManager* threadManager) { + delete threadManager; +} + Runtime* internalRuntimeFactory( const std::string& kind, MemoryManager* memoryManager, + ThreadManager* threadManager, const std::unordered_map& sessionConf) { - return new InternalRuntime(kind, memoryManager, sessionConf); + return new InternalRuntime(kind, memoryManager, threadManager, sessionConf); } void internalRuntimeReleaser(Runtime* runtime) { @@ -248,6 +270,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { getJniErrorState()->ensureInitialized(env); MemoryManager::registerFactory(kInternalBackendKind, internalMemoryManagerFactory, internalMemoryManagerReleaser); + ThreadManager::registerFactory(kInternalBackendKind, internalThreadManagerFactory, internalThreadManagerReleaser); Runtime::registerFactory(kInternalBackendKind, internalRuntimeFactory, internalRuntimeReleaser); byteArrayClass = createGlobalClassReferenceOrError(env, "[B"); @@ -315,14 +338,16 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_runtime_RuntimeJniWrapper_createR jclass, jstring jBackendType, jlong nmmHandle, + jlong ntmHandle, jbyteArray sessionConf) { JNI_METHOD_START MemoryManager* memoryManager = jniCastOrThrow(nmmHandle); + ThreadManager* threadManager = jniCastOrThrow(ntmHandle); auto safeArray = getByteArrayElementsSafe(env, sessionConf); auto sparkConf = parseConfMap(env, safeArray.elems(), safeArray.length()); auto backendType = jStringToCString(env, jBackendType); - auto runtime = Runtime::create(backendType, memoryManager, sparkConf); + auto runtime = Runtime::create(backendType, memoryManager, threadManager, sparkConf); return reinterpret_cast(runtime); JNI_METHOD_END(kInvalidObjectHandle) } @@ -366,6 +391,33 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_memory_NativeMemoryManagerJniWrap JNI_METHOD_END(-1L) } +JNIEXPORT jlong JNICALL Java_org_apache_gluten_threads_NativeThreadManagerJniWrapper_create( // NOLINT + JNIEnv* env, + jclass, + jstring jBackendType, + jobject jInitializer) { + JNI_METHOD_START + JavaVM* vm; + if (env->GetJavaVM(&vm) != JNI_OK) { + throw GlutenException("Unable to get JavaVM instance"); + } + auto backendType = jStringToCString(env, jBackendType); + std::unique_ptr initializer = std::make_unique(vm, jInitializer); + ThreadManager* tm = ThreadManager::create(backendType, std::move(initializer)); + return reinterpret_cast(tm); + JNI_METHOD_END(-1L) +} + +JNIEXPORT void JNICALL Java_org_apache_gluten_threads_NativeThreadManagerJniWrapper_release( // NOLINT + JNIEnv* env, + jclass, + jlong ntmHandle) { + JNI_METHOD_START + auto* threadManager = jniCastOrThrow(ntmHandle); + ThreadManager::release(threadManager); + JNI_METHOD_END() +} + JNIEXPORT jbyteArray JNICALL Java_org_apache_gluten_memory_NativeMemoryManagerJniWrapper_collectUsage( // NOLINT JNIEnv* env, jclass, diff --git a/cpp/core/threads/ThreadInitializer.cc b/cpp/core/threads/ThreadInitializer.cc new file mode 100644 index 000000000000..2db757b4d110 --- /dev/null +++ b/cpp/core/threads/ThreadInitializer.cc @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ThreadInitializer.h" + +namespace gluten { +namespace { + +class NoopThreadInitializer final : public ThreadInitializer { + public: + void initialize(const std::string& threadName) override {} + void destroy(const std::string& threadName) override {}; +}; + +} // namespace + +std::unique_ptr ThreadInitializer::noop() { + return std::make_unique(); +} + +} // namespace gluten diff --git a/cpp/core/threads/ThreadInitializer.h b/cpp/core/threads/ThreadInitializer.h new file mode 100644 index 000000000000..fa0271141bfc --- /dev/null +++ b/cpp/core/threads/ThreadInitializer.h @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace gluten { + +class ThreadInitializer { + public: + static std::unique_ptr noop(); + + virtual ~ThreadInitializer() = default; + + virtual void initialize(const std::string& threadName) = 0; + + virtual void destroy(const std::string& threadName) = 0; + + protected: + ThreadInitializer() = default; +}; + +} // namespace gluten diff --git a/cpp/core/threads/ThreadManager.cc b/cpp/core/threads/ThreadManager.cc new file mode 100644 index 000000000000..20a377b9a03d --- /dev/null +++ b/cpp/core/threads/ThreadManager.cc @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ThreadManager.h" + +#include "utils/Registry.h" + +namespace gluten { +namespace { + +Registry& threadManagerFactories() { + static Registry registry; + return registry; +} + +Registry& threadManagerReleasers() { + static Registry registry; + return registry; +} + +} // namespace + +void ThreadManager::registerFactory(const std::string& kind, Factory factory, Releaser releaser) { + threadManagerFactories().registerObj(kind, std::move(factory)); + threadManagerReleasers().registerObj(kind, std::move(releaser)); +} + +ThreadManager* ThreadManager::create(const std::string& kind, std::unique_ptr initializer) { + auto& factory = threadManagerFactories().get(kind); + return factory(kind, std::move(initializer)); +} + +void ThreadManager::release(ThreadManager* threadManager) { + const std::string kind = threadManager->kind(); + auto& releaser = threadManagerReleasers().get(kind); + releaser(threadManager); +} + +} // namespace gluten diff --git a/cpp/core/threads/ThreadManager.h b/cpp/core/threads/ThreadManager.h new file mode 100644 index 000000000000..ff4f160ef351 --- /dev/null +++ b/cpp/core/threads/ThreadManager.h @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "threads/ThreadInitializer.h" + +namespace gluten { + +class ThreadManager { + public: + using Factory = std::function initializer)>; + using Releaser = std::function; + + static void registerFactory(const std::string& kind, Factory factory, Releaser releaser); + static ThreadManager* create(const std::string& kind, std::unique_ptr initializer); + static void release(ThreadManager* threadManager); + + explicit ThreadManager(const std::string& kind) : kind_(kind) {} + + virtual ~ThreadManager() = default; + + virtual std::string kind() { + return kind_; + } + + virtual ThreadInitializer* getThreadInitializer() = 0; + + private: + std::string kind_; +}; + +} // namespace gluten diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc b/cpp/velox/benchmarks/GenericBenchmark.cc index 616ba9bcfbde..1e00eadd8483 100644 --- a/cpp/velox/benchmarks/GenericBenchmark.cc +++ b/cpp/velox/benchmarks/GenericBenchmark.cc @@ -38,6 +38,7 @@ #include "tests/utils/LocalRssClient.h" #include "tests/utils/TestAllocationListener.h" #include "tests/utils/TestStreamReader.h" +#include "threads/ThreadInitializer.h" #include "utils/Exception.h" #include "utils/StringUtil.h" #include "utils/Timer.h" @@ -382,7 +383,7 @@ void setQueryTraceConfig(std::unordered_map& configs) } } // namespace -using RuntimeFactory = std::function; +using RuntimeFactory = std::function; auto BM_Generic = [](::benchmark::State& state, const std::string& planFile, @@ -398,7 +399,8 @@ auto BM_Generic = [](::benchmark::State& state, auto* listenerPtr = listener.get(); auto* memoryManager = MemoryManager::create(kVeloxBackendKind, std::move(listener)); - auto runtime = runtimeFactory(memoryManager); + auto* threadManager = ThreadManager::create(kVeloxBackendKind, ThreadInitializer::noop()); + auto runtime = runtimeFactory(memoryManager, threadManager); auto plan = getPlanFromFile("Plan", planFile); std::vector splits{}; @@ -507,6 +509,7 @@ auto BM_Generic = [](::benchmark::State& state, updateBenchmarkMetrics(state, elapsedTime, readInputTime, writerMetrics, readerMetrics); Runtime::release(runtime); + ThreadManager::release(threadManager); MemoryManager::release(memoryManager); }; @@ -522,7 +525,8 @@ auto BM_ShuffleWriteRead = [](::benchmark::State& state, auto* listenerPtr = listener.get(); auto* memoryManager = MemoryManager::create(kVeloxBackendKind, std::move(listener)); - auto runtime = runtimeFactory(memoryManager); + auto* threadManager = ThreadManager::create(kVeloxBackendKind, ThreadInitializer::noop()); + auto runtime = runtimeFactory(memoryManager, threadManager); const size_t dirIndex = std::hash{}(std::this_thread::get_id()) % localDirs.size(); const auto dataFileDir = @@ -554,6 +558,7 @@ auto BM_ShuffleWriteRead = [](::benchmark::State& state, updateBenchmarkMetrics(state, elapsedTime, readInputTime, writerMetrics, readerMetrics); Runtime::release(runtime); + ThreadManager::release(threadManager); MemoryManager::release(memoryManager); }; @@ -709,8 +714,8 @@ int main(int argc, char** argv) { } } - RuntimeFactory runtimeFactory = [=](MemoryManager* memoryManager) { - return dynamic_cast(Runtime::create(kVeloxBackendKind, memoryManager, sessionConf)); + RuntimeFactory runtimeFactory = [=](MemoryManager* memoryManager, ThreadManager* threadManager) { + return dynamic_cast(Runtime::create(kVeloxBackendKind, memoryManager, threadManager, sessionConf)); }; const auto localDirs = createLocalDirs(); diff --git a/cpp/velox/benchmarks/ParquetWriteBenchmark.cc b/cpp/velox/benchmarks/ParquetWriteBenchmark.cc index c66f6a3a4f00..a534e1f6d432 100644 --- a/cpp/velox/benchmarks/ParquetWriteBenchmark.cc +++ b/cpp/velox/benchmarks/ParquetWriteBenchmark.cc @@ -23,6 +23,7 @@ #include "memory/VeloxMemoryManager.h" #include "operators/reader/ParquetReaderIterator.h" #include "operators/writer/VeloxParquetDataSource.h" +#include "threads/ThreadInitializer.h" #include "utils/VeloxArrowUtils.h" namespace gluten { @@ -52,7 +53,8 @@ class GoogleBenchmarkVeloxParquetWriteCacheScanBenchmark { // reuse the ParquetWriteConverter for batches caused system % increase a lot auto memoryManager = getDefaultMemoryManager(); - auto runtime = Runtime::create(kVeloxBackendKind, memoryManager); + auto* threadManager = ThreadManager::create(kVeloxBackendKind, ThreadInitializer::noop()); + auto runtime = Runtime::create(kVeloxBackendKind, memoryManager, threadManager); auto veloxPool = memoryManager->getAggregateMemoryPool(); for (auto _ : state) { @@ -98,6 +100,7 @@ class GoogleBenchmarkVeloxParquetWriteCacheScanBenchmark { state.counters["write_time"] = benchmark::Counter(writeTime, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); Runtime::release(runtime); + ThreadManager::release(threadManager); } private: diff --git a/cpp/velox/compute/VeloxBackend.cc b/cpp/velox/compute/VeloxBackend.cc index 4738d2e3a6a5..56b03848508e 100644 --- a/cpp/velox/compute/VeloxBackend.cc +++ b/cpp/velox/compute/VeloxBackend.cc @@ -84,15 +84,37 @@ void veloxMemoryManagerReleaser(MemoryManager* memoryManager) { Runtime* veloxRuntimeFactory( const std::string& kind, MemoryManager* memoryManager, + ThreadManager* threadManager, const std::unordered_map& sessionConf) { auto* vmm = dynamic_cast(memoryManager); GLUTEN_CHECK(vmm != nullptr, "Not a Velox memory manager"); - return new VeloxRuntime(kind, vmm, sessionConf); + return new VeloxRuntime(kind, vmm, threadManager, sessionConf); } void veloxRuntimeReleaser(Runtime* runtime) { delete runtime; } + +class VeloxThreadManager : public ThreadManager { + public: + VeloxThreadManager(const std::string& kind, std::unique_ptr initializer) + : ThreadManager(kind), initializer_(std::shared_ptr(std::move(initializer))) {} + + ThreadInitializer* getThreadInitializer() override { + return initializer_.get(); + } + + private: + std::shared_ptr initializer_; +}; + +ThreadManager* veloxThreadManagerFactory(const std::string& kind, std::unique_ptr initializer) { + return new VeloxThreadManager(kind, std::move(initializer)); +} + +void veloxThreadManagerReleaser(ThreadManager* threadManager) { + delete threadManager; +} } // namespace void VeloxBackend::init( @@ -105,6 +127,7 @@ void VeloxBackend::init( // Register factories. MemoryManager::registerFactory(kVeloxBackendKind, veloxMemoryManagerFactory, veloxMemoryManagerReleaser); + ThreadManager::registerFactory(kVeloxBackendKind, veloxThreadManagerFactory, veloxThreadManagerReleaser); Runtime::registerFactory(kVeloxBackendKind, veloxRuntimeFactory, veloxRuntimeReleaser); if (backendConf_->get(kDebugModeEnabled, false)) { @@ -186,6 +209,29 @@ void VeloxBackend::init( } #endif + const int32_t numTaskSlotsPerExecutor = backendConf_->get(kNumTaskSlotsPerExecutor, kNumTaskSlotsPerExecutorDefault); + GLUTEN_CHECK( + numTaskSlotsPerExecutor >= 0, + kNumTaskSlotsPerExecutor + " was set to negative number " + std::to_string(numTaskSlotsPerExecutor) + ", this should not happen."); + + const bool parallelExecutionEnabled = backendConf_->get(kParallelExecutionEnabled, kParallelExecutionEnabledDefault); + if (parallelExecutionEnabled) { + // Default: 2 * task slots. + int32_t threadPoolSize = backendConf_->get(kParallelExecutionThreadPoolSize, 2 * numTaskSlotsPerExecutor); + executor_ = std::make_unique(threadPoolSize); + LOG(INFO) << "Initialized CPUThreadPoolExecutor for parallel execution with thread num: " << threadPoolSize + << " (numTaskSlotsPerExecutor: " << numTaskSlotsPerExecutor << ")"; + } + + const auto spillThreadNum = backendConf_->get(kSpillThreadNum, kSpillThreadNumDefaultValue); + if (spillThreadNum > 0) { + const auto spillThreadNumOnExecutor = spillThreadNum * numTaskSlotsPerExecutor; + spillExecutor_ = std::make_unique(spillThreadNumOnExecutor); + LOG(INFO) << "Initialized CPUThreadPoolExecutor for spill with thread num: " << spillThreadNumOnExecutor + << " (spillThreadNum: " << spillThreadNum + << ", numTaskSlotsPerExecutor: " << numTaskSlotsPerExecutor << ")"; + } + initJolFilesystem(); initConnector(hiveConf); @@ -309,7 +355,8 @@ void VeloxBackend::initCache() { } void VeloxBackend::initConnector(const std::shared_ptr& hiveConf) { - auto ioThreads = backendConf_->get(kVeloxIOThreads, kVeloxIOThreadsDefault); + auto ioThreads = backendConf_->get(kVeloxIOThreads, + backendConf_->get(kNumTaskSlotsPerExecutor, kNumTaskSlotsPerExecutorDefault)); GLUTEN_CHECK( ioThreads >= 0, kVeloxIOThreads + " was set to negative number " + std::to_string(ioThreads) + ", this should not happen."); diff --git a/cpp/velox/compute/VeloxBackend.h b/cpp/velox/compute/VeloxBackend.h index 9176b369778c..e15c9c4e65e6 100644 --- a/cpp/velox/compute/VeloxBackend.h +++ b/cpp/velox/compute/VeloxBackend.h @@ -58,6 +58,14 @@ class VeloxBackend { } folly::Executor* executor() const { + return executor_.get(); + } + + folly::Executor* spillExecutor() const { + return spillExecutor_.get(); + } + + folly::Executor* ioExecutor() const { return ioExecutor_.get(); } @@ -70,7 +78,10 @@ class VeloxBackend { init(std::move(listener), conf); } - void init(std::unique_ptr listener, const std::unordered_map& conf); + void init( + std::unique_ptr listener, + const std::unordered_map& conf); + void initCache(); void initConnector(const std::shared_ptr& hiveConf); void initUdf(); @@ -89,6 +100,8 @@ class VeloxBackend { // Instance of AsyncDataCache used for all large allocations. std::shared_ptr asyncDataCache_; + std::unique_ptr executor_; + std::unique_ptr spillExecutor_; std::unique_ptr ssdCacheExecutor_; std::unique_ptr ioExecutor_; std::shared_ptr cacheAllocator_; diff --git a/cpp/velox/compute/VeloxRuntime.cc b/cpp/velox/compute/VeloxRuntime.cc index d97498586b71..cf196ffac044 100644 --- a/cpp/velox/compute/VeloxRuntime.cc +++ b/cpp/velox/compute/VeloxRuntime.cc @@ -65,8 +65,9 @@ namespace gluten { VeloxRuntime::VeloxRuntime( const std::string& kind, VeloxMemoryManager* vmm, + ThreadManager* threadManager, const std::unordered_map& confMap) - : Runtime(kind, vmm, confMap) { + : Runtime(kind, vmm, threadManager, confMap) { // Refresh session config. veloxCfg_ = std::make_shared(std::unordered_map(confMap_)); @@ -190,6 +191,7 @@ std::shared_ptr VeloxRuntime::createResultIterator( auto wholeStageIter = std::make_unique( memoryManager(), + threadManager(), veloxPlan_, scanIds, scanInfos, diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h index 728cc46c92a4..70cd88bbd4b5 100644 --- a/cpp/velox/compute/VeloxRuntime.h +++ b/cpp/velox/compute/VeloxRuntime.h @@ -40,6 +40,7 @@ class VeloxRuntime final : public Runtime { explicit VeloxRuntime( const std::string& kind, VeloxMemoryManager* vmm, + ThreadManager* threadManager, const std::unordered_map& confMap); void setSparkTaskInfo(SparkTaskInfo taskInfo) override { diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 3c0505263159..09a32ab490fc 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -23,6 +23,9 @@ #include "velox/connectors/hive/HiveConfig.h" #include "velox/connectors/hive/HiveConnectorSplit.h" #include "velox/exec/PlanNodeStats.h" +#include +#include +#include #ifdef GLUTEN_ENABLE_GPU #include #include "velox/experimental/cudf/CudfConfig.h" @@ -68,10 +71,82 @@ const std::string kWriteIOTime = "writeIOWallNanos"; // others const std::string kHiveDefaultPartition = "__HIVE_DEFAULT_PARTITION__"; +class HookedExecutor final : public folly::Executor { +public: + HookedExecutor( + folly::Executor::KeepAlive parent, + std::string prefix, + ThreadInitializer* initializer) + : parent_(std::move(parent)), + prefix_(std::move(prefix)), + initializer_(initializer) {} + + void add(folly::Func func) override { + inFlight_.fetch_add(1, std::memory_order_relaxed); + parent_->add(wrap(std::move(func))); + } + + void addWithPriority(folly::Func func, int8_t priority) override { + inFlight_.fetch_add(1, std::memory_order_relaxed); + parent_->addWithPriority(wrap(std::move(func)), priority); + } + + uint8_t getNumPriorities() const override { + return parent_->getNumPriorities(); + } + + void join() { + std::unique_lock lock(mutex_); + cv_.wait(lock, [&] { + return inFlight_.load(std::memory_order_acquire) == 0; + }); + } + +private: + folly::Func wrap(folly::Func func) { + auto seq = suffix_.fetch_add(1, std::memory_order_relaxed); + auto taskName = prefix_ + std::to_string(seq); + auto* initializer = initializer_; + auto* self = this; + + return [func = std::move(func), + taskName = std::move(taskName), + initializer, + self]() mutable { + auto done = folly::makeGuard([&] { + if (self->inFlight_.fetch_sub(1, std::memory_order_acq_rel) == 1) { + std::lock_guard lock(self->mutex_); + self->cv_.notify_all(); + } + }); + + GLUTEN_CHECK(initializer != nullptr, "ThreadInitializer is null."); + initializer->initialize(taskName); + auto guard = folly::makeGuard([&] { + initializer->destroy(taskName); + }); + + func(); + }; + } + + folly::Executor::KeepAlive parent_; + std::string prefix_; + ThreadInitializer* initializer_; + std::atomic suffix_{0}; + + std::atomic inFlight_{0}; + std::mutex mutex_; + std::condition_variable cv_; +}; + } // namespace + + WholeStageResultIterator::WholeStageResultIterator( VeloxMemoryManager* memoryManager, + ThreadManager* threadManager, const std::shared_ptr& planNode, const std::vector& scanNodeIds, const std::vector>& scanInfos, @@ -80,6 +155,7 @@ WholeStageResultIterator::WholeStageResultIterator( const std::shared_ptr& veloxCfg, const SparkTaskInfo& taskInfo) : memoryManager_(memoryManager), + threadManager_(threadManager), veloxCfg_(veloxCfg), #ifdef GLUTEN_ENABLE_GPU enableCudf_(veloxCfg_->get(kCudfEnabled, kCudfEnabledDefault)), @@ -90,37 +166,42 @@ WholeStageResultIterator::WholeStageResultIterator( scanInfos_(scanInfos), streamIds_(streamIds) { spillStrategy_ = veloxCfg_->get(kSpillStrategy, kSpillStrategyDefaultValue); - auto spillThreadNum = veloxCfg_->get(kSpillThreadNum, kSpillThreadNumDefaultValue); - if (spillThreadNum > 0) { - spillExecutor_ = std::make_shared(spillThreadNum); - } getOrderedNodeIds(veloxPlan_, orderedNodeIds_); auto fileSystem = velox::filesystems::getFileSystem(spillDir, nullptr); GLUTEN_CHECK(fileSystem != nullptr, "File System for spilling is null!"); fileSystem->mkdir(spillDir); - velox::common::SpillDiskOptions spillOpts{ - .spillDirPath = spillDir, .spillDirCreated = true, .spillDirCreateCb = nullptr}; - // Create task instance. std::unordered_set emptySet; - velox::core::PlanFragment planFragment{planNode, velox::core::ExecutionStrategy::kUngrouped, 1, emptySet}; - std::shared_ptr queryCtx = createNewVeloxQueryCtx(); - task_ = velox::exec::Task::create( - fmt::format( - "Gluten_Stage_{}_TID_{}_VTID_{}", - std::to_string(taskInfo_.stageId), - std::to_string(taskInfo_.taskId), - std::to_string(taskInfo.vId)), - std::move(planFragment), - 0, - std::move(queryCtx), - velox::exec::Task::ExecutionMode::kSerial, - /*consumer=*/velox::exec::Consumer{}, - /*memoryArbitrationPriority=*/0, - /*spillDiskOpts=*/spillOpts, - /*onError=*/nullptr); - if (!task_->supportSerialExecutionMode()) { + const bool parallelExecutionEnabled = VeloxBackend::get()->getBackendConf()->get(kParallelExecutionEnabled, kParallelExecutionEnabledDefault); + const bool serialExecution = !parallelExecutionEnabled; + if (!serialExecution) { + auto globalExecutor = VeloxBackend::get()->executor(); + GLUTEN_CHECK(globalExecutor != nullptr, "VeloxBackend is null!"); + auto initializer = threadManager_->getThreadInitializer(); + auto parallelTaskId = fmt::format( + "Gluten_Parallel_Task_{}_TID_{}_VTID_{}", + std::to_string(taskInfo_.stageId), + std::to_string(taskInfo_.taskId), + std::to_string(taskInfo_.vId)); + taskExecutor_ = std::make_unique(globalExecutor, parallelTaskId, initializer); + } + + facebook::velox::exec::CursorParameters params; + params.planNode = planNode; + params.destination = 0; + params.maxDrivers = serialExecution ? 1 : veloxCfg_->get(kParallelExecutionMaxDrivers, kParallelExecutionMaxDriversDefault); + params.queryCtx = createNewVeloxQueryCtx(); + params.executionStrategy = velox::core::ExecutionStrategy::kUngrouped; + params.groupedExecutionLeafNodeIds = std::move(emptySet); + params.numSplitGroups = 1; + params.spillDirectory = spillDir; + params.serialExecution = serialExecution; + params.copyResult = false; + params.outputPool = memoryManager_->getLeafMemoryPool(); + cursor_ = velox::exec::TaskCursor::create(params); + task_ = cursor_->task(); + if (serialExecution && !task_->supportSerialExecutionMode()) { throw std::runtime_error("Task doesn't support single threaded execution: " + planNode->toString()); } @@ -211,16 +292,33 @@ WholeStageResultIterator::WholeStageResultIterator( } } +WholeStageResultIterator::~WholeStageResultIterator() { + if (task_ != nullptr && task_->isRunning()) { + // calling .wait() may take no effect in single thread execution mode + task_->requestCancel().wait(); + } + if (taskExecutor_ != nullptr) { + auto* hookedExecutor = dynamic_cast(taskExecutor_.get()); + hookedExecutor->join(); + } + taskExecutor_.reset(); +#ifdef GLUTEN_ENABLE_GPU + if (enableCudf_) { + unlockGpu(); + } +#endif +} + std::shared_ptr WholeStageResultIterator::createNewVeloxQueryCtx() { std::unordered_map> connectorConfigs; connectorConfigs[kHiveConnectorId] = createHiveConnectorSessionConfig(veloxCfg_); std::shared_ptr ctx = velox::core::QueryCtx::create( - nullptr, + taskExecutor_.get(), facebook::velox::core::QueryConfig{getQueryContextConf()}, connectorConfigs, gluten::VeloxBackend::get()->getAsyncDataCache(), memoryManager_->getAggregateMemoryPool(), - spillExecutor_.get(), + gluten::VeloxBackend::get()->spillExecutor(), fmt::format( "Gluten_Stage_{}_TID_{}_VTID_{}", std::to_string(taskInfo_.stageId), @@ -230,41 +328,24 @@ std::shared_ptr WholeStageResultIterator::createNewVeloxQ } std::shared_ptr WholeStageResultIterator::next() { - if (task_->isFinished()) { - return nullptr; - } - velox::RowVectorPtr vector; while (true) { - auto future = velox::ContinueFuture::makeEmpty(); - auto out = task_->next(&future); - if (!future.valid()) { - // Not need to wait. Break. - vector = std::move(out); - break; + if (!cursor_->moveNext()) { + return nullptr; } - // Velox suggested to wait. This might be because another thread (e.g., background io thread) is spilling the task. - GLUTEN_CHECK(out == nullptr, "Expected to wait but still got non-null output from Velox task"); - VLOG(2) << "Velox task " << task_->taskId() - << " is busy when ::next() is called. Will wait and try again. Task state: " - << taskStateString(task_->state()); - future.wait(); - } - if (vector == nullptr) { - return nullptr; - } - uint64_t numRows = vector->size(); - if (numRows == 0) { - return nullptr; - } - - { - ScopedTimer timer(&loadLazyVectorTime_); - for (auto& child : vector->children()) { - child->loadedVector(); + RowVectorPtr vector = cursor_->current(); + GLUTEN_CHECK(vector != nullptr, "Cursor returned null vector."); + uint64_t numRows = vector->size(); + if (numRows == 0) { + continue; + } + { + ScopedTimer timer(&loadLazyVectorTime_); + for (auto& child : vector->children()) { + child->loadedVector(); + } } + return std::make_shared(vector); } - - return std::make_shared(vector); } int64_t WholeStageResultIterator::spillFixedSize(int64_t size) { @@ -296,9 +377,6 @@ void WholeStageResultIterator::getOrderedNodeIds( std::vector& nodeIds) { bool isProjectNode = (std::dynamic_pointer_cast(planNode) != nullptr); bool isLocalExchangeNode = (std::dynamic_pointer_cast(planNode) != nullptr); - bool isUnionNode = isLocalExchangeNode && - std::dynamic_pointer_cast(planNode)->type() == - velox::core::LocalPartitionNode::Type::kGather; const auto& sourceNodes = planNode->sources(); if (isProjectNode) { GLUTEN_CHECK(sourceNodes.size() == 1, "Illegal state"); @@ -314,22 +392,24 @@ void WholeStageResultIterator::getOrderedNodeIds( return; } - if (isUnionNode) { - // FIXME: The whole metrics system in gluten-substrait is magic. Passing metrics trees through JNI with a trivial - // array is possible but requires for a solid design. Apparently we haven't had it. All the code requires complete - // rework. - // Union was interpreted as LocalPartition + LocalExchange + 2 fake projects as children in Velox. So we only fetch - // metrics from the root node. - std::vector> unionChildren{}; + if (isLocalExchangeNode) { + // LocalPartition was interpreted as LocalPartition + LocalExchange + 2 fake projects (optional) as children + // in SubstraitToVeloxPlan. So we only fetch metrics from the root node. for (const auto& source : planNode->sources()) { const auto projectedChild = std::dynamic_pointer_cast(source); - GLUTEN_CHECK(projectedChild != nullptr, "Illegal state"); - const auto projectSources = projectedChild->sources(); - GLUTEN_CHECK(projectSources.size() == 1, "Illegal state"); - const auto projectSource = projectSources.at(0); - getOrderedNodeIds(projectSource, nodeIds); + if (projectedChild != nullptr) { + const auto projectSources = projectedChild->sources(); + GLUTEN_CHECK(projectSources.size() == 1, "Illegal state"); + const auto projectSource = projectSources.at(0); + getOrderedNodeIds(projectSource, nodeIds); + } else { + getOrderedNodeIds(source, nodeIds); + } + } + if (planNode->sources().size() == 2) { + // The LocalPartition maps to a concrete Spark native union transformer operator. + nodeIds.emplace_back(planNode->id()); } - nodeIds.emplace_back(planNode->id()); return; } @@ -390,6 +470,7 @@ void WholeStageResultIterator::noMoreSplits() { for (const auto& streamId : streamIds_) { task_->noMoreSplits(streamId); } + cursor_->setNoMoreSplits(); allSplitsAdded_ = true; } diff --git a/cpp/velox/compute/WholeStageResultIterator.h b/cpp/velox/compute/WholeStageResultIterator.h index 9bb6ef8b11f7..e76e25df4f2e 100644 --- a/cpp/velox/compute/WholeStageResultIterator.h +++ b/cpp/velox/compute/WholeStageResultIterator.h @@ -23,9 +23,11 @@ #include "substrait/SubstraitToVeloxPlan.h" #include "substrait/plan.pb.h" #include "utils/Metrics.h" +#include #include "velox/common/config/Config.h" #include "velox/connectors/hive/iceberg/IcebergSplit.h" #include "velox/core/PlanNode.h" +#include "velox/exec/Cursor.h" #include "velox/exec/Task.h" #ifdef GLUTEN_ENABLE_GPU #include "cudf/GpuLock.h" @@ -37,6 +39,7 @@ class WholeStageResultIterator : public SplitAwareColumnarBatchIterator { public: WholeStageResultIterator( VeloxMemoryManager* memoryManager, + ThreadManager* threadManager, const std::shared_ptr& planNode, const std::vector& scanNodeIds, const std::vector>& scanInfos, @@ -45,17 +48,7 @@ class WholeStageResultIterator : public SplitAwareColumnarBatchIterator { const std::shared_ptr& veloxCfg, const SparkTaskInfo& taskInfo); - virtual ~WholeStageResultIterator() { - if (task_ != nullptr && task_->isRunning()) { - // calling .wait() may take no effect in single thread execution mode - task_->requestCancel().wait(); - } -#ifdef GLUTEN_ENABLE_GPU - if (enableCudf_) { - unlockGpu(); - } -#endif - } + virtual ~WholeStageResultIterator(); std::shared_ptr next() override; @@ -119,6 +112,7 @@ class WholeStageResultIterator : public SplitAwareColumnarBatchIterator { /// Memory. VeloxMemoryManager* memoryManager_; + ThreadManager* threadManager_; /// Config, task and plan. const std::shared_ptr veloxCfg_; @@ -126,12 +120,13 @@ class WholeStageResultIterator : public SplitAwareColumnarBatchIterator { const bool enableCudf_; #endif const SparkTaskInfo taskInfo_; + std::unique_ptr taskExecutor_; + std::unique_ptr cursor_; std::shared_ptr task_; std::shared_ptr veloxPlan_; /// Spill. std::string spillStrategy_; - std::shared_ptr spillExecutor_ = nullptr; /// Metrics std::unique_ptr metrics_{}; diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h index 54db73303184..e6108a94d8f6 100644 --- a/cpp/velox/config/VeloxConfig.h +++ b/cpp/velox/config/VeloxConfig.h @@ -82,6 +82,17 @@ const std::string kHashProbeDynamicFilterPushdownEnabled = const std::string kHashProbeBloomFilterPushdownMaxSize = "spark.gluten.sql.columnar.backend.velox.hashProbe.bloomFilterPushdown.maxSize"; +const std::string kParallelExecutionEnabled = + "spark.gluten.sql.columnar.backend.velox.parallelExecution.enabled"; +const bool kParallelExecutionEnabledDefault = false; + +const std::string kParallelExecutionThreadPoolSize = + "spark.gluten.sql.columnar.backend.velox.parallelExecution.threadPoolSize"; + +const std::string kParallelExecutionMaxDrivers = + "spark.gluten.sql.columnar.backend.velox.parallelExecution.maxDrivers"; +const int32_t kParallelExecutionMaxDriversDefault = 4; + const std::string kValueStreamDynamicFilterEnabled = "spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled"; const bool kValueStreamDynamicFilterEnabledDefault = false; @@ -137,8 +148,9 @@ const std::string kVeloxSsdCheckSumReadVerificationEnabled = "spark.gluten.sql.columnar.backend.velox.ssdChecksumReadVerificationEnabled"; // async +const std::string kNumTaskSlotsPerExecutor = "spark.gluten.numTaskSlotsPerExecutor"; +const int32_t kNumTaskSlotsPerExecutorDefault = -1; const std::string kVeloxIOThreads = "spark.gluten.sql.columnar.backend.velox.IOThreads"; -const uint32_t kVeloxIOThreadsDefault = 0; const std::string kVeloxAsyncTimeoutOnTaskStopping = "spark.gluten.sql.columnar.backend.velox.asyncTimeoutOnTaskStopping"; const int32_t kVeloxAsyncTimeoutOnTaskStoppingDefault = 30000; // 30s diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc index 4705a646e2d6..a39fe92101a9 100644 --- a/cpp/velox/jni/VeloxJniWrapper.cc +++ b/cpp/velox/jni/VeloxJniWrapper.cc @@ -1072,7 +1072,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_native facebook::velox::exec::BaseHashTable::kNoSpillInputStartPartitionBit, 1'000'000, hashTableBuilders[0]->dropDuplicates(), - allowParallelJoinBuild ? VeloxBackend::get()->executor() : nullptr); + allowParallelJoinBuild ? VeloxBackend::get()->ioExecutor() : nullptr); for (int i = 1; i < numThreads; ++i) { if (hashTableBuilders[i]->joinHasNullKeys()) { diff --git a/cpp/velox/operators/plannodes/CudfVectorStream.h b/cpp/velox/operators/plannodes/CudfVectorStream.h index 7a663d252e4c..7ec9476ad267 100644 --- a/cpp/velox/operators/plannodes/CudfVectorStream.h +++ b/cpp/velox/operators/plannodes/CudfVectorStream.h @@ -134,6 +134,11 @@ class CudfValueStreamNode final : public facebook::velox::core::PlanNode { std::shared_ptr iterator) : facebook::velox::core::PlanNode(id), outputType_(outputType), iterator_(std::move(iterator)) {} + // Only supports single thread because iterator_ is not guranteed thread-safe. + bool requiresSingleThread() const override { + return true; + } + const facebook::velox::RowTypePtr& outputType() const override { return outputType_; } diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index adb7fc5f45b6..c0a34f89d248 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -23,6 +23,8 @@ #include "operators/hashjoin/HashTableBuilder.h" #include "operators/plannodes/RowVectorStream.h" #include "velox/connectors/hive/HiveDataSink.h" +#include "velox/exec/HashPartitionFunction.h" +#include "velox/exec/RoundRobinPartitionFunction.h" #include "velox/exec/TableWriter.h" #include "velox/type/Type.h" @@ -30,6 +32,7 @@ #include "utils/ObjectStore.h" #include "utils/VeloxWriterUtils.h" +#include "compute/VeloxBackend.h" #include "config.pb.h" #include "config/GlutenConfig.h" #include "config/VeloxConfig.h" @@ -419,8 +422,11 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: opaqueSharedHashTable = nullptr; } + // BHJ left side can be round-robin parallelized. + leftNode = addRoundRobinPartitionForParallelExecution(leftNode); + // Create HashJoinNode node - return std::make_shared( + auto hashJoinNode = std::make_shared( nextPlanNodeId(), joinType, isNullAwareAntiJoin, @@ -433,6 +439,8 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: false, joinHasNullKeys, opaqueSharedHashTable); + auto gatheredHashJoinNode = addGatherForParallelExecution(hashJoinNode); + return gatheredHashJoinNode; } else { // Create HashJoinNode node return std::make_shared( @@ -1229,6 +1237,92 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan( childNode); } +bool SubstraitToVeloxPlanConverter::isParallelExecutionEnabled() const { + return VeloxBackend::get()->getBackendConf()->get(kParallelExecutionEnabled, kParallelExecutionEnabledDefault); +} + +core::PlanNodePtr SubstraitToVeloxPlanConverter::createLocalPartitionNode( + core::LocalPartitionNode::Type type, + core::PartitionFunctionSpecPtr partitionFunctionSpec, + const std::vector& sources) { + GLUTEN_CHECK(!sources.empty(), "At least one source is required for Velox LocalPartition"); + + std::vector newChildren; + newChildren.reserve(sources.size()); + const bool isSingleSource = sources.size() == 1; + const RowTypePtr outRowType = asRowType(sources[0]->outputType()); + std::vector outNames; + outNames.reserve(outRowType->size()); + for (int32_t colIdx = 0; colIdx < outRowType->size(); ++colIdx) { + outNames.push_back(outRowType->nameOf(colIdx)); + } + for (const auto& child : sources) { + const RowTypePtr& childRowType = child->outputType(); + std::vector expressions; + expressions.reserve(outNames.size()); + for (int32_t colIdx = 0; colIdx < outNames.size(); ++colIdx) { + core::TypedExprPtr expr = + std::make_shared(childRowType->childAt(colIdx), childRowType->nameOf(colIdx)); + if (!isSingleSource) { + // Unifies children types for multi-children case (e.g., union). + expr = std::make_shared(outRowType->childAt(colIdx), expr, false); + } + expressions.push_back(expr); + } + newChildren.push_back(std::make_shared(nextPlanNodeId(), outNames, expressions, child)); + } + + return std::make_shared( + nextPlanNodeId(), type, false, std::move(partitionFunctionSpec), std::move(newChildren)); +} + +core::PlanNodePtr SubstraitToVeloxPlanConverter::addHashPartitionForParallelExecution( + const core::PlanNodePtr& source, + const std::vector& keys) { + GLUTEN_CHECK(!keys.empty(), "Keys are expected for adding local partition for parallel execution."); + if (!isParallelExecutionEnabled()) { + return source; + } + + core::PartitionFunctionSpecPtr partitionFunctionSpec; + std::vector keyChannels; + keyChannels.reserve(keys.size()); + std::vector constValues; + constValues.reserve(keys.size()); + const auto& outputType = source->outputType(); + for (const auto& key : keys) { + if (auto field = std::dynamic_pointer_cast(key)) { + keyChannels.emplace_back(outputType->getChildIdx(field->name())); + } else if (auto constant = std::dynamic_pointer_cast(key)) { + keyChannels.emplace_back(kConstantChannel); + constValues.push_back(constant->toConstantVector(pool_)); + } else { + VELOX_UNREACHABLE(); + } + } + partitionFunctionSpec = + std::make_shared(outputType, std::move(keyChannels), std::move(constValues)); + + return createLocalPartitionNode(core::LocalPartitionNode::Type::kRepartition,std::move(partitionFunctionSpec),{source}); +} + +core::PlanNodePtr SubstraitToVeloxPlanConverter::addRoundRobinPartitionForParallelExecution( + const core::PlanNodePtr& source) { + if (!isParallelExecutionEnabled()) { + return source; + } + return createLocalPartitionNode(core::LocalPartitionNode::Type::kRepartition, + std::make_shared(),{source}); +} + +core::PlanNodePtr SubstraitToVeloxPlanConverter::addGatherForParallelExecution(const core::PlanNodePtr& source) { + if (!isParallelExecutionEnabled()) { + return source; + } + return createLocalPartitionNode(core::LocalPartitionNode::Type::kGather, + std::make_shared(),{source}); +} + core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::SetRel& setRel) { switch (setRel.op()) { case ::substrait::SetRel_SetOp::SetRel_SetOp_SET_OP_UNION_ALL: { @@ -1238,36 +1332,10 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: children.push_back(toVeloxPlan(input)); } GLUTEN_CHECK(!children.empty(), "At least one source is required for Velox LocalPartition"); - - // Velox doesn't allow different field names in schemas of LocalPartitionNode's children. - // Add project nodes to unify the schemas. - const RowTypePtr outRowType = asRowType(children[0]->outputType()); - std::vector outNames; - for (int32_t colIdx = 0; colIdx < outRowType->size(); ++colIdx) { - const auto name = outRowType->childAt(colIdx)->name(); - outNames.push_back(name); - } - - std::vector projectedChildren; - for (int32_t i = 0; i < children.size(); ++i) { - const auto& child = children[i]; - const RowTypePtr& childRowType = child->outputType(); - std::vector expressions; - for (int32_t colIdx = 0; colIdx < outNames.size(); ++colIdx) { - const auto fa = - std::make_shared(childRowType->childAt(colIdx), childRowType->nameOf(colIdx)); - const auto cast = std::make_shared(outRowType->childAt(colIdx), fa, false); - expressions.push_back(cast); - } - auto project = std::make_shared(nextPlanNodeId(), outNames, expressions, child); - projectedChildren.push_back(project); - } - return std::make_shared( - nextPlanNodeId(), + return createLocalPartitionNode( core::LocalPartitionNode::Type::kGather, - false, std::make_shared(), - projectedChildren); + children); } default: throw GlutenException("Unsupported SetRel op: " + std::to_string(setRel.op())); @@ -1377,7 +1445,8 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::constructValueStreamNode( splitInfo->leafType = SplitInfo::LeafType::SPLIT_AWARE_STREAM; splitInfoMap_[tableScanNode->id()] = splitInfo; - return tableScanNode; + auto gatheredTableScanNode = addGatherForParallelExecution(tableScanNode); + return gatheredTableScanNode; } #ifdef GLUTEN_ENABLE_GPU @@ -1530,7 +1599,8 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: nextPlanNodeId(), std::move(outputType), std::move(tableHandle), assignments); // Set split info map. splitInfoMap_[tableScanNode->id()] = splitInfo; - return tableScanNode; + auto gatheredTableScanNode = addGatherForParallelExecution(tableScanNode); + return gatheredTableScanNode; } } diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h b/cpp/velox/substrait/SubstraitToVeloxPlan.h index 47bf3a0525b1..7e8fd43aacb7 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.h +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h @@ -277,6 +277,19 @@ class SubstraitToVeloxPlanConverter { const ::substrait::WindowType& type, const RowTypePtr& inputType); + bool isParallelExecutionEnabled() const; + + core::PlanNodePtr createLocalPartitionNode( + core::LocalPartitionNode::Type type, + core::PartitionFunctionSpecPtr partitionFunctionSpec, + const std::vector& sources); + + core::PlanNodePtr addHashPartitionForParallelExecution( + const core::PlanNodePtr& source, + const std::vector& keys); + core::PlanNodePtr addRoundRobinPartitionForParallelExecution(const core::PlanNodePtr& source); + core::PlanNodePtr addGatherForParallelExecution(const core::PlanNodePtr& source); + /// The unique identification for each PlanNode. int planNodeId_ = 0; diff --git a/cpp/velox/tests/RuntimeTest.cc b/cpp/velox/tests/RuntimeTest.cc index 5487e7fd2e4e..ec67b0365887 100644 --- a/cpp/velox/tests/RuntimeTest.cc +++ b/cpp/velox/tests/RuntimeTest.cc @@ -20,6 +20,7 @@ #include #include "compute/VeloxBackend.h" #include "memory.pb.h" +#include "threads/ThreadInitializer.h" namespace gluten { @@ -46,13 +47,26 @@ class DummyMemoryManager final : public MemoryManager { inline static const std::string kDummyBackendKind{"dummy"}; +class DummyThreadManager final : public ThreadManager { + public: + explicit DummyThreadManager(const std::string& kind) : ThreadManager(kind), initializer_(ThreadInitializer::noop()) {} + + std::shared_ptr getThreadInitializer() override { + return initializer_; + } + + private: + std::shared_ptr initializer_; +}; + class DummyRuntime final : public Runtime { public: DummyRuntime( const std::string& kind, DummyMemoryManager* mm, + ThreadManager* tm, const std::unordered_map& conf) - : Runtime(kind, mm, conf) {} + : Runtime(kind, mm, tm, conf) {} void parsePlan(const uint8_t* data, int32_t size) override {} @@ -127,8 +141,9 @@ class DummyRuntime final : public Runtime { static Runtime* dummyRuntimeFactory( const std::string& kind, MemoryManager* mm, + ThreadManager* tm, const std::unordered_map conf) { - return new DummyRuntime(kind, dynamic_cast(mm), conf); + return new DummyRuntime(kind, dynamic_cast(mm), tm, conf); } static void dummyRuntimeReleaser(Runtime* runtime) { @@ -138,7 +153,8 @@ static void dummyRuntimeReleaser(Runtime* runtime) { TEST(TestRuntime, CreateRuntime) { Runtime::registerFactory(kDummyBackendKind, dummyRuntimeFactory, dummyRuntimeReleaser); DummyMemoryManager mm(kDummyBackendKind); - auto runtime = Runtime::create(kDummyBackendKind, &mm); + DummyThreadManager tm(kDummyBackendKind); + auto runtime = Runtime::create(kDummyBackendKind, &mm, &tm); ASSERT_EQ(typeid(*runtime), typeid(DummyRuntime)); Runtime::release(runtime); } @@ -146,14 +162,18 @@ TEST(TestRuntime, CreateRuntime) { TEST(TestRuntime, CreateVeloxRuntime) { VeloxBackend::create(AllocationListener::noop(), {}); auto mm = MemoryManager::create(kVeloxBackendKind, AllocationListener::noop()); - auto runtime = Runtime::create(kVeloxBackendKind, mm); + auto tm = ThreadManager::create(kVeloxBackendKind, ThreadInitializer::noop()); + auto runtime = Runtime::create(kVeloxBackendKind, mm, tm); ASSERT_EQ(typeid(*runtime), typeid(VeloxRuntime)); Runtime::release(runtime); + ThreadManager::release(tm); } TEST(TestRuntime, GetResultIterator) { DummyMemoryManager mm(kDummyBackendKind); - auto runtime = std::make_shared(kDummyBackendKind, &mm, std::unordered_map()); + DummyThreadManager tm(kDummyBackendKind); + auto runtime = + std::make_shared(kDummyBackendKind, &mm, &tm, std::unordered_map()); auto iter = runtime->createResultIterator("/tmp/test-spill", {}); runtime->noMoreSplits(iter.get()); ASSERT_TRUE(iter->hasNext()); diff --git a/ep/build-velox/src/get-velox.sh b/ep/build-velox/src/get-velox.sh index 1e3f71c3582e..6d6cf8fd1cbf 100755 --- a/ep/build-velox/src/get-velox.sh +++ b/ep/build-velox/src/get-velox.sh @@ -25,7 +25,7 @@ RUN_SETUP_SCRIPT=ON ENABLE_ENHANCED_FEATURES=OFF # Developer use only for testing Velox PR. -UPSTREAM_VELOX_PR_ID="" +UPSTREAM_VELOX_PR_ID="16991" OS=`uname -s` diff --git a/gluten-arrow/src/main/java/org/apache/gluten/runtime/RuntimeJniWrapper.java b/gluten-arrow/src/main/java/org/apache/gluten/runtime/RuntimeJniWrapper.java index 8c4280a3b53d..b661e31b6acf 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/runtime/RuntimeJniWrapper.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/runtime/RuntimeJniWrapper.java @@ -20,7 +20,7 @@ public class RuntimeJniWrapper { private RuntimeJniWrapper() {} - public static native long createRuntime(String backendType, long nmm, byte[] sessionConf); + public static native long createRuntime(String backendType, long nmm, long ntm, byte[] sessionConf); public static native void releaseRuntime(long handle); } diff --git a/gluten-arrow/src/main/java/org/apache/gluten/threads/NativeThreadInitializer.java b/gluten-arrow/src/main/java/org/apache/gluten/threads/NativeThreadInitializer.java new file mode 100644 index 000000000000..0812b94b12c1 --- /dev/null +++ b/gluten-arrow/src/main/java/org/apache/gluten/threads/NativeThreadInitializer.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.threads; + +public interface NativeThreadInitializer { + void initialize(String threadName); + void destroy(String threadName); +} diff --git a/gluten-arrow/src/main/java/org/apache/gluten/threads/NativeThreadManagerJniWrapper.java b/gluten-arrow/src/main/java/org/apache/gluten/threads/NativeThreadManagerJniWrapper.java new file mode 100644 index 000000000000..74f6cd69de70 --- /dev/null +++ b/gluten-arrow/src/main/java/org/apache/gluten/threads/NativeThreadManagerJniWrapper.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.threads; + +public class NativeThreadManagerJniWrapper { + private NativeThreadManagerJniWrapper() {} + + public static native long create(String backendType, NativeThreadInitializer initializer); + + public static native void release(long handle); +} diff --git a/gluten-arrow/src/main/java/org/apache/gluten/threads/TaskChildThreadInitializer.java b/gluten-arrow/src/main/java/org/apache/gluten/threads/TaskChildThreadInitializer.java new file mode 100644 index 000000000000..6d6c863f63bb --- /dev/null +++ b/gluten-arrow/src/main/java/org/apache/gluten/threads/TaskChildThreadInitializer.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gluten.threads; + +import com.google.common.base.Preconditions; +import org.apache.spark.TaskContext; +import org.apache.spark.util.SparkTaskUtil; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class TaskChildThreadInitializer implements NativeThreadInitializer { + private final TaskContext parentTaskContext; + private final Map childThreads = new ConcurrentHashMap<>(); + + public TaskChildThreadInitializer(TaskContext parentTaskContext) { + Preconditions.checkNotNull(parentTaskContext); + this.parentTaskContext = parentTaskContext; + } + + @Override + public void initialize(String threadName) { + final String javaThreadName = Thread.currentThread().getName(); + if (childThreads.put(threadName, javaThreadName) != null) { + throw new IllegalStateException( + String.format("Task native child thread %s (Java name: %s) is already initialized", + threadName, javaThreadName)); + } + SparkTaskUtil.setTaskContext(parentTaskContext); + } + + @Override + public void destroy(String threadName) { + if (childThreads.remove(threadName) == null) { + throw new IllegalStateException( + String.format("Task native thread %s is not initialized", threadName)); + } + SparkTaskUtil.unsetTaskContext(); + } +} diff --git a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala b/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala index e57bec619d0e..e1807de70311 100644 --- a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala +++ b/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala @@ -19,18 +19,18 @@ package org.apache.gluten.runtime import org.apache.gluten.config.GlutenConfig import org.apache.gluten.exception.GlutenException import org.apache.gluten.memory.NativeMemoryManager +import org.apache.gluten.threads.{NativeThreadManager, TaskChildThreadInitializer} import org.apache.gluten.utils.ConfigUtil - import org.apache.spark.sql.internal.{GlutenConfigUtil, SQLConf} -import org.apache.spark.task.TaskResource +import org.apache.spark.task.{TaskResource, TaskResources} import java.util import java.util.concurrent.atomic.AtomicBoolean - import scala.collection.JavaConverters._ // for 2.12 trait Runtime { def memoryManager(): NativeMemoryManager + def threadManager(): NativeThreadManager def getHandle(): Long } @@ -51,9 +51,13 @@ object Runtime { with TaskResource { private val nmm: NativeMemoryManager = NativeMemoryManager(backendName, name) + private val ntm: NativeThreadManager = NativeThreadManager( + backendName, + new TaskChildThreadInitializer(TaskResources.getLocalTaskContext())) private val handle = RuntimeJniWrapper.createRuntime( backendName, nmm.getHandle(), + ntm.getHandle(), ConfigUtil.serialize( (GlutenConfig .getNativeSessionConf( @@ -67,6 +71,8 @@ object Runtime { override def memoryManager(): NativeMemoryManager = nmm + override def threadManager(): NativeThreadManager = ntm + override def release(): Unit = { if (!released.compareAndSet(false, true)) { throw new GlutenException( diff --git a/gluten-arrow/src/main/scala/org/apache/gluten/threads/NativeThreadManager.scala b/gluten-arrow/src/main/scala/org/apache/gluten/threads/NativeThreadManager.scala new file mode 100644 index 000000000000..635154dd35d3 --- /dev/null +++ b/gluten-arrow/src/main/scala/org/apache/gluten/threads/NativeThreadManager.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.threads + +import org.apache.gluten.exception.GlutenException + +import org.apache.spark.task.{TaskResource, TaskResources} + +import java.util.concurrent.atomic.AtomicBoolean + +trait NativeThreadManager { + def getHandle(): Long +} + +object NativeThreadManager { + private class Impl(private val backendName: String, private val initializer: NativeThreadInitializer) + extends NativeThreadManager + with TaskResource { + private val handle = NativeThreadManagerJniWrapper.create(backendName, initializer) + private val released = new AtomicBoolean(false) + + override def getHandle(): Long = handle + + override def release(): Unit = { + if (!released.compareAndSet(false, true)) { + throw new GlutenException( + s"Thread manager instance already released: $handle, ${resourceName()}, ${priority()}") + } + NativeThreadManagerJniWrapper.release(handle) + } + + override def priority(): Int = 20 + + override def resourceName(): String = "ntm" + } + + def apply(backendName: String, initializer: NativeThreadInitializer): NativeThreadManager = { + TaskResources.addAnonymousResource(new Impl(backendName, initializer)) + } +} diff --git a/gluten-core/src/main/scala/org/apache/spark/task/TaskResources.scala b/gluten-core/src/main/scala/org/apache/spark/task/TaskResources.scala index 2c386e52ff16..1b148d9b2a06 100644 --- a/gluten-core/src/main/scala/org/apache/spark/task/TaskResources.scala +++ b/gluten-core/src/main/scala/org/apache/spark/task/TaskResources.scala @@ -140,7 +140,7 @@ object TaskResources extends TaskListener with Logging { if (!inSparkTask()) { throw new UnsupportedOperationException( "Not in a Spark task. If the code is running on driver or for testing purpose, " + - "try using TaskResources#runUnsafe") + "try using TaskResources#runUnsafe. Current thread: " + Thread.currentThread().getName) } val tc = getLocalTaskContext() RESOURCE_REGISTRIES.synchronized { diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala index 371c2b96091a..caa81ab13a91 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala @@ -605,10 +605,8 @@ object GlutenConfig extends ConfigRegistry { (SPARK_S3_CONNECTION_MAXIMUM, "15"), ("spark.gluten.velox.fs.s3a.retry.mode", "legacy"), ( - "spark.gluten.sql.columnar.backend.velox.IOThreads", - conf.getOrElse( - GlutenCoreConfig.NUM_TASK_SLOTS_PER_EXECUTOR.key, - GlutenCoreConfig.NUM_TASK_SLOTS_PER_EXECUTOR.defaultValueString)), + GlutenCoreConfig.NUM_TASK_SLOTS_PER_EXECUTOR.key, + GlutenCoreConfig.NUM_TASK_SLOTS_PER_EXECUTOR.defaultValueString), (COLUMNAR_SHUFFLE_CODEC.key, ""), (COLUMNAR_SHUFFLE_CODEC_BACKEND.key, ""), (DEBUG_CUDF.key, DEBUG_CUDF.defaultValueString),