diff --git a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala index 633979613d85..728421b9f333 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala @@ -211,14 +211,11 @@ object VeloxConfig extends ConfigRegistry { .intConf .createWithDefault(1) - val COLUMNAR_VELOX_ASYNC_TIMEOUT = + val COLUMNAR_VELOX_ASYNC_TIMEOUT_ON_TASK_STOPPING = buildStaticConf("spark.gluten.sql.columnar.backend.velox.asyncTimeoutOnTaskStopping") - .doc( - "Timeout for asynchronous execution when task is being stopped in Velox backend. " + - "It's recommended to set to a number larger than network connection timeout that the " + - "possible aysnc tasks are relying on.") + .doc("Timeout in milliseconds when waiting for runtime-scoped async work to finish during teardown.") .timeConf(TimeUnit.MILLISECONDS) - .createWithDefault(30000) + .createWithDefault(30000L) val COLUMNAR_VELOX_SPLIT_PRELOAD_PER_DRIVER = buildConf("spark.gluten.sql.columnar.backend.velox.SplitPreloadPerDriver") diff --git a/cpp/velox/compute/VeloxBackend.cc b/cpp/velox/compute/VeloxBackend.cc index bc2104f3f63d..a9e8fd71a16e 100644 --- a/cpp/velox/compute/VeloxBackend.cc +++ b/cpp/velox/compute/VeloxBackend.cc @@ -66,9 +66,6 @@ DECLARE_bool(velox_ssd_odirect); DECLARE_bool(velox_memory_pool_capacity_transfer_across_tasks); DECLARE_int32(cache_prefetch_min_pct); -DECLARE_int32(gluten_velox_async_timeout_on_task_stopping); -DEFINE_int32(gluten_velox_async_timeout_on_task_stopping, 30000, "Async timout when task is being stopped"); - using namespace facebook; namespace gluten { @@ -144,14 +141,10 @@ void VeloxBackend::init( // Set velox_memory_use_hugepages. FLAGS_velox_memory_use_hugepages = backendConf_->get(kMemoryUseHugePages, kMemoryUseHugePagesDefault); - // Async timeout. - FLAGS_gluten_velox_async_timeout_on_task_stopping = - backendConf_->get(kVeloxAsyncTimeoutOnTaskStopping, kVeloxAsyncTimeoutOnTaskStoppingDefault); - // Set cache_prefetch_min_pct default as 0 to force all loads are prefetched in DirectBufferInput. FLAGS_cache_prefetch_min_pct = backendConf_->get(kCachePrefetchMinPct, 0); - auto hiveConf = createHiveConnectorConfig(backendConf_); + hiveConnectorConfig_ = createHiveConnectorConfig(backendConf_); // Setup and register. velox::filesystems::registerLocalFileSystem(); @@ -167,7 +160,7 @@ void VeloxBackend::init( #endif #ifdef ENABLE_ABFS velox::filesystems::registerAbfsFileSystem(); - velox::filesystems::registerAzureClientProvider(*hiveConf); + velox::filesystems::registerAzureClientProvider(*hiveConnectorConfig_); #endif #ifdef GLUTEN_ENABLE_GPU @@ -187,8 +180,13 @@ void VeloxBackend::init( } #endif + const auto spillThreadNum = backendConf_->get(kSpillThreadNum, kSpillThreadNumDefaultValue); + if (spillThreadNum > 0) { + spillExecutor_ = std::make_unique(spillThreadNum); + } + initJolFilesystem(); - initConnector(hiveConf); + initConnector(hiveConnectorConfig_); velox::dwio::common::registerFileSinks(); velox::parquet::registerParquetReaderFactory(); @@ -310,6 +308,7 @@ void VeloxBackend::initCache() { } void VeloxBackend::initConnector(const std::shared_ptr& hiveConf) { + (void)hiveConf; auto ioThreads = backendConf_->get(kVeloxIOThreads, kVeloxIOThreadsDefault); GLUTEN_CHECK( ioThreads >= 0, @@ -319,24 +318,28 @@ void VeloxBackend::initConnector(const std::shared_ptr>()); } - velox::connector::registerConnector( - std::make_shared(kHiveConnectorId, hiveConf, ioExecutor_.get())); - - // Register value-stream connector for runtime iterator-based inputs - auto valueStreamDynamicFilterEnabled = - backendConf_->get(kValueStreamDynamicFilterEnabled, kValueStreamDynamicFilterEnabledDefault); - velox::connector::registerConnector( - std::make_shared(kIteratorConnectorId, hiveConf, valueStreamDynamicFilterEnabled)); - +} + +std::shared_ptr VeloxBackend::createHiveConnector( + const std::string& connectorId, + folly::Executor* ioExecutor) const { + return std::make_shared(connectorId, hiveConnectorConfig_, ioExecutor); +} + +std::shared_ptr VeloxBackend::createValueStreamConnector( + const std::string& connectorId, + bool dynamicFilterEnabled) const { + return std::make_shared(connectorId, hiveConnectorConfig_, dynamicFilterEnabled); +} + #ifdef GLUTEN_ENABLE_GPU - if (backendConf_->get(kCudfEnableTableScan, kCudfEnableTableScanDefault) && - backendConf_->get(kCudfEnabled, kCudfEnabledDefault)) { - facebook::velox::cudf_velox::connector::hive::CudfHiveConnectorFactory factory; - auto hiveConnector = factory.newConnector(kCudfHiveConnectorId, hiveConf, ioExecutor_.get()); - facebook::velox::connector::registerConnector(hiveConnector); - } -#endif +std::shared_ptr VeloxBackend::createCudfHiveConnector( + const std::string& connectorId, + folly::Executor* ioExecutor) const { + facebook::velox::cudf_velox::connector::hive::CudfHiveConnectorFactory factory; + return factory.newConnector(connectorId, hiveConnectorConfig_, ioExecutor); } +#endif void VeloxBackend::initUdf() { auto got = backendConf_->get(kVeloxUdfLibraryPaths, ""); @@ -376,6 +379,8 @@ void VeloxBackend::tearDown() { // Destruct IOThreadPoolExecutor will join all threads. // On threads exit, thread local variables can be constructed with referencing global variables. // So, we need to destruct IOThreadPoolExecutor and stop the threads before global variables get destructed. + executor_.reset(); + spillExecutor_.reset(); ioExecutor_.reset(); globalMemoryManager_.reset(); diff --git a/cpp/velox/compute/VeloxBackend.h b/cpp/velox/compute/VeloxBackend.h index c6fbf965cf08..9894efa7747c 100644 --- a/cpp/velox/compute/VeloxBackend.h +++ b/cpp/velox/compute/VeloxBackend.h @@ -27,6 +27,7 @@ #include "velox/common/caching/AsyncDataCache.h" #include "velox/common/config/Config.h" #include "velox/common/memory/MmapAllocator.h" +#include "velox/connectors/Connector.h" #include "jni/JniHashTable.h" #include "memory/VeloxMemoryManager.h" @@ -58,9 +59,31 @@ class VeloxBackend { } folly::Executor* executor() const { + return executor_.get(); + } + + folly::Executor* spillExecutor() const { + return spillExecutor_.get(); + } + + folly::Executor* ioExecutor() const { return ioExecutor_.get(); } + std::shared_ptr createHiveConnector( + const std::string& connectorId, + folly::Executor* ioExecutor) const; + + std::shared_ptr createValueStreamConnector( + const std::string& connectorId, + bool dynamicFilterEnabled) const; + +#ifdef GLUTEN_ENABLE_GPU + std::shared_ptr createCudfHiveConnector( + const std::string& connectorId, + folly::Executor* ioExecutor) const; +#endif + void tearDown(); private: @@ -90,8 +113,11 @@ class VeloxBackend { std::shared_ptr asyncDataCache_; std::unique_ptr ssdCacheExecutor_; + std::unique_ptr executor_; + std::unique_ptr spillExecutor_; std::unique_ptr ioExecutor_; std::shared_ptr cacheAllocator_; + std::shared_ptr hiveConnectorConfig_; std::string cachePathPrefix_; std::string cacheFilePrefix_; diff --git a/cpp/velox/compute/VeloxConnectorIds.h b/cpp/velox/compute/VeloxConnectorIds.h new file mode 100644 index 000000000000..e6082bae8bdf --- /dev/null +++ b/cpp/velox/compute/VeloxConnectorIds.h @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace gluten { + +struct VeloxConnectorIds { + std::string hive; + std::string iterator; + std::string cudfHive; + bool hiveRegistered{false}; + bool iteratorRegistered{false}; + bool cudfHiveRegistered{false}; +}; + +} // namespace gluten diff --git a/cpp/velox/compute/VeloxPlanConverter.cc b/cpp/velox/compute/VeloxPlanConverter.cc index 627bd396b7df..f3ffab59a6a8 100644 --- a/cpp/velox/compute/VeloxPlanConverter.cc +++ b/cpp/velox/compute/VeloxPlanConverter.cc @@ -30,12 +30,20 @@ VeloxPlanConverter::VeloxPlanConverter( velox::memory::MemoryPool* veloxPool, const facebook::velox::config::ConfigBase* veloxCfg, const std::vector>& rowVectors, + VeloxConnectorIds connectorIds, const std::optional writeFilesTempPath, const std::optional writeFileName, bool validationMode) : validationMode_(validationMode), veloxCfg_(veloxCfg), - substraitVeloxPlanConverter_(veloxPool, veloxCfg, rowVectors, writeFilesTempPath, writeFileName, validationMode) { + substraitVeloxPlanConverter_( + veloxPool, + veloxCfg, + rowVectors, + std::move(connectorIds), + writeFilesTempPath, + writeFileName, + validationMode) { VELOX_USER_CHECK_NOT_NULL(veloxCfg_); } diff --git a/cpp/velox/compute/VeloxPlanConverter.h b/cpp/velox/compute/VeloxPlanConverter.h index 0b597a91f9ed..1aee2c36bd12 100644 --- a/cpp/velox/compute/VeloxPlanConverter.h +++ b/cpp/velox/compute/VeloxPlanConverter.h @@ -21,6 +21,7 @@ #include #include +#include "compute/VeloxConnectorIds.h" #include "substrait/SubstraitToVeloxPlan.h" #include "substrait/plan.pb.h" @@ -33,6 +34,7 @@ class VeloxPlanConverter { facebook::velox::memory::MemoryPool* veloxPool, const facebook::velox::config::ConfigBase* veloxCfg, const std::vector>& rowVectors, + VeloxConnectorIds connectorIds, const std::optional writeFilesTempPath = std::nullopt, const std::optional writeFileName = std::nullopt, bool validationMode = false); diff --git a/cpp/velox/compute/VeloxRuntime.cc b/cpp/velox/compute/VeloxRuntime.cc index d97498586b71..6beec867b421 100644 --- a/cpp/velox/compute/VeloxRuntime.cc +++ b/cpp/velox/compute/VeloxRuntime.cc @@ -20,19 +20,26 @@ #include #include +#include #include +#include +#include + +#include #include "VeloxBackend.h" #include "compute/ResultIterator.h" #include "compute/Runtime.h" #include "compute/VeloxPlanConverter.h" #include "config/VeloxConfig.h" +#include "operators/plannodes/IteratorSplit.h" #include "operators/serializer/VeloxRowToColumnarConverter.h" #include "shuffle/VeloxShuffleReader.h" #include "shuffle/VeloxShuffleWriter.h" #include "utils/ConfigExtractor.h" #include "utils/VeloxArrowUtils.h" #include "utils/VeloxWholeStageDumper.h" +#include "velox/common/process/StackTrace.h" DECLARE_bool(velox_exception_user_stacktrace_enabled); DECLARE_bool(velox_memory_use_hugepages); @@ -62,6 +69,135 @@ using namespace facebook; namespace gluten { +namespace { + +class HookedExecutor final : public folly::Executor { + public: + HookedExecutor(folly::Executor* parent, std::string name, bool debug) + : parent_(parent), name_(std::move(name)), debug_(debug) {} + + void add(folly::Func func) override { + GLUTEN_CHECK(parent_ != nullptr, "Parent executor is null."); + inFlight_.fetch_add(1, std::memory_order_relaxed); + parent_->add(wrap(std::move(func), 0)); + } + + void addWithPriority(folly::Func func, int8_t priority) override { + GLUTEN_CHECK(parent_ != nullptr, "Parent executor is null."); + inFlight_.fetch_add(1, std::memory_order_relaxed); + parent_->addWithPriority(wrap(std::move(func), priority), priority); + } + + uint8_t getNumPriorities() const override { + return parent_ == nullptr ? 1 : parent_->getNumPriorities(); + } + + bool join(std::chrono::milliseconds timeout) { + std::unique_lock lock(mutex_); + return cv_.wait_for(lock, timeout, [&] { + return inFlight_.load(std::memory_order_acquire) == 0; + }); + } + + void dumpOutstandingTasks() const { + if (!debug_) { + return; + } + std::lock_guard lock(taskMutex_); + if (inFlightTasks_.empty()) { + LOG(WARNING) << "Hooked executor " << name_ << " timed out with no tracked in-flight tasks."; + return; + } + for (const auto& [taskId, info] : inFlightTasks_) { + const auto elapsedMs = std::chrono::duration_cast( + std::chrono::steady_clock::now() - info.enqueueTime) + .count(); + LOG(WARNING) << "Outstanding task in hooked executor " << name_ << ": taskId=" << taskId + << ", elapsedMs=" << elapsedMs << ", priority=" << static_cast(info.priority) + << ", submitStacktrace:\n" + << info.submitStacktrace; + } + } + + private: + struct TaskInfo { + std::chrono::steady_clock::time_point enqueueTime; + int8_t priority; + std::string submitStacktrace; + }; + + folly::Func wrap(folly::Func func, int8_t priority) { + auto* self = this; + const auto taskId = nextTaskId_.fetch_add(1, std::memory_order_relaxed); + if (debug_) { + TaskInfo info{ + .enqueueTime = std::chrono::steady_clock::now(), + .priority = priority, + .submitStacktrace = velox::process::StackTrace().toString()}; + std::lock_guard lock(taskMutex_); + inFlightTasks_[taskId] = std::move(info); + } + return [func = std::move(func), self, taskId]() mutable { + auto done = folly::makeGuard([&] { + if (self->debug_) { + std::lock_guard lock(self->taskMutex_); + self->inFlightTasks_.erase(taskId); + } + if (self->inFlight_.fetch_sub(1, std::memory_order_acq_rel) == 1) { + std::lock_guard lock(self->mutex_); + self->cv_.notify_all(); + } + }); + func(); + }; + } + + folly::Executor* parent_; + std::string name_; + bool debug_; + std::atomic nextTaskId_{0}; + std::atomic inFlight_{0}; + std::mutex mutex_; + std::condition_variable cv_; + mutable std::mutex taskMutex_; + std::unordered_map inFlightTasks_; +}; + +std::unique_ptr makeHookedExecutor(folly::Executor* parent, const std::string& name, bool debug) { + if (parent == nullptr) { + return nullptr; + } + return std::make_unique(parent, name, debug); +} + +void joinHookedExecutor(std::unique_ptr& executor, std::chrono::milliseconds timeout, bool debug) { + if (executor == nullptr) { + return; + } + auto* hookedExecutor = dynamic_cast(executor.get()); + GLUTEN_CHECK(hookedExecutor != nullptr, "Expected HookedExecutor"); + if (!hookedExecutor->join(timeout)) { + LOG(WARNING) << "Timed out waiting for hooked executor to finish after " << timeout.count() << " ms."; + if (debug) { + hookedExecutor->dumpOutstandingTasks(); + } + } + executor.reset(); +} + +std::string makeScopedConnectorId(const std::string& base, uint64_t runtimeId) { + return fmt::format("{}-runtime-{}", base, runtimeId); +} + +VeloxConnectorIds makeScopedConnectorIds(uint64_t runtimeId) { + return VeloxConnectorIds{ + .hive = makeScopedConnectorId(kHiveConnectorId, runtimeId), + .iterator = makeScopedConnectorId(kIteratorConnectorId, runtimeId), + .cudfHive = makeScopedConnectorId(kCudfHiveConnectorId, runtimeId)}; +} + +} // namespace + VeloxRuntime::VeloxRuntime( const std::string& kind, VeloxMemoryManager* vmm, @@ -80,6 +216,82 @@ VeloxRuntime::VeloxRuntime( FLAGS_velox_memory_use_hugepages = veloxCfg_->get(kMemoryUseHugePages, FLAGS_velox_memory_use_hugepages); FLAGS_velox_memory_pool_capacity_transfer_across_tasks = veloxCfg_->get( kMemoryPoolCapacityTransferAcrossTasks, FLAGS_velox_memory_pool_capacity_transfer_across_tasks); + + static std::atomic runtimeId{0}; + connectorIds_ = makeScopedConnectorIds(runtimeId++); + + initializeExecutors(); + registerConnectors(); +} + +VeloxRuntime::~VeloxRuntime() { + const auto timeoutMs = + veloxCfg_->get(kVeloxAsyncTimeoutOnTaskStopping, kVeloxAsyncTimeoutOnTaskStoppingDefault); + const auto timeout = std::chrono::milliseconds(timeoutMs); + joinHookedExecutor(executor_, timeout, debugModeEnabled_); + joinHookedExecutor(spillExecutor_, timeout, debugModeEnabled_); + joinHookedExecutor(ioExecutor_, timeout, debugModeEnabled_); + unregisterConnectors(); +} + +void VeloxRuntime::initializeExecutors() { + executor_ = makeHookedExecutor(VeloxBackend::get()->executor(), kind_ + ".executor", debugModeEnabled_); + spillExecutor_ = makeHookedExecutor(VeloxBackend::get()->spillExecutor(), kind_ + ".spill", debugModeEnabled_); + ioExecutor_ = makeHookedExecutor(VeloxBackend::get()->ioExecutor(), kind_ + ".io", debugModeEnabled_); +} + +void VeloxRuntime::registerConnectors() { + auto* backend = VeloxBackend::get(); + connectorIds_.hiveRegistered = velox::connector::registerConnector( + backend->createHiveConnector(connectorIds_.hive, ioExecutor_.get())); + GLUTEN_CHECK( + connectorIds_.hiveRegistered, + "Failed to register scoped hive connector: " + connectorIds_.hive); + GLUTEN_CHECK( + velox::connector::hasConnector(connectorIds_.hive), + "Scoped hive connector not found after registration: " + connectorIds_.hive); + + const auto valueStreamDynamicFilterEnabled = + veloxCfg_->get(kValueStreamDynamicFilterEnabled, kValueStreamDynamicFilterEnabledDefault); + connectorIds_.iteratorRegistered = velox::connector::registerConnector( + backend->createValueStreamConnector(connectorIds_.iterator, valueStreamDynamicFilterEnabled)); + GLUTEN_CHECK( + connectorIds_.iteratorRegistered, + "Failed to register scoped iterator connector: " + connectorIds_.iterator); + GLUTEN_CHECK( + velox::connector::hasConnector(connectorIds_.iterator), + "Scoped iterator connector not found after registration: " + connectorIds_.iterator); + +#ifdef GLUTEN_ENABLE_GPU + if (veloxCfg_->get(kCudfEnableTableScan, kCudfEnableTableScanDefault) && + veloxCfg_->get(kCudfEnabled, kCudfEnabledDefault)) { + connectorIds_.cudfHiveRegistered = velox::connector::registerConnector( + backend->createCudfHiveConnector(connectorIds_.cudfHive, ioExecutor_.get())); + GLUTEN_CHECK( + connectorIds_.cudfHiveRegistered, + "Failed to register scoped cudf hive connector: " + connectorIds_.cudfHive); + GLUTEN_CHECK( + velox::connector::hasConnector(connectorIds_.cudfHive), + "Scoped cudf hive connector not found after registration: " + connectorIds_.cudfHive); + } +#endif +} + +void VeloxRuntime::unregisterConnectors() { +#ifdef GLUTEN_ENABLE_GPU + if (connectorIds_.cudfHiveRegistered) { + velox::connector::unregisterConnector(connectorIds_.cudfHive); + connectorIds_.cudfHiveRegistered = false; + } +#endif + if (connectorIds_.iteratorRegistered) { + velox::connector::unregisterConnector(connectorIds_.iterator); + connectorIds_.iteratorRegistered = false; + } + if (connectorIds_.hiveRegistered) { + velox::connector::unregisterConnector(connectorIds_.hive); + connectorIds_.hiveRegistered = false; + } } void VeloxRuntime::parsePlan(const uint8_t* data, int32_t size) { @@ -153,7 +365,14 @@ break; std::string VeloxRuntime::planString(bool details, const std::unordered_map& sessionConf) { auto veloxMemoryPool = gluten::defaultLeafVeloxMemoryPool(); - VeloxPlanConverter veloxPlanConverter(veloxMemoryPool.get(), veloxCfg_.get(), {}, std::nullopt, std::nullopt, true); + VeloxPlanConverter veloxPlanConverter( + veloxMemoryPool.get(), + veloxCfg_.get(), + {}, + connectorIds_, + std::nullopt, + std::nullopt, + true); auto veloxPlan = veloxPlanConverter.toVeloxPlan(substraitPlan_, localFiles_); return veloxPlan->toString(details, true); } @@ -173,6 +392,7 @@ std::shared_ptr VeloxRuntime::createResultIterator( memoryManager()->getLeafMemoryPool().get(), veloxCfg_.get(), inputs, + connectorIds_, *localWriteFilesTempPath(), *localWriteFileName()); veloxPlan_ = veloxPlanConverter.toVeloxPlan(substraitPlan_, std::move(localFiles_)); @@ -194,6 +414,9 @@ std::shared_ptr VeloxRuntime::createResultIterator( scanIds, scanInfos, streamIds, + executor_.get(), + spillExecutor_.get(), + connectorIds_, spillDir, veloxCfg_, taskInfo_.has_value() ? taskInfo_.value() : SparkTaskInfo{}); diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h index 728cc46c92a4..ea5e3b5186d3 100644 --- a/cpp/velox/compute/VeloxRuntime.h +++ b/cpp/velox/compute/VeloxRuntime.h @@ -19,9 +19,11 @@ #include "WholeStageResultIterator.h" #include "compute/Runtime.h" +#include "compute/VeloxConnectorIds.h" #ifdef GLUTEN_ENABLE_ENHANCED_FEATURES #include "iceberg/IcebergWriter.h" #endif +#include #include "memory/VeloxMemoryManager.h" #include "operators/serializer/VeloxColumnarBatchSerializer.h" #include "operators/serializer/VeloxColumnarToRowConverter.h" @@ -42,6 +44,8 @@ class VeloxRuntime final : public Runtime { VeloxMemoryManager* vmm, const std::unordered_map& confMap); + ~VeloxRuntime() override; + void setSparkTaskInfo(SparkTaskInfo taskInfo) override { static std::atomic vtId{0}; taskInfo.vId = vtId++; @@ -116,6 +120,22 @@ class VeloxRuntime final : public Runtime { return debugModeEnabled_; } + folly::Executor* executor() const { + return executor_.get(); + } + + folly::Executor* spillExecutor() const { + return spillExecutor_.get(); + } + + folly::Executor* ioExecutor() const { + return ioExecutor_.get(); + } + + const VeloxConnectorIds& connectorIds() const { + return connectorIds_; + } + static void getInfoAndIds( const std::unordered_map>& splitInfoMap, const std::unordered_set& leafPlanNodeIds, @@ -124,9 +144,17 @@ class VeloxRuntime final : public Runtime { std::vector& streamIds); private: + void initializeExecutors(); + void registerConnectors(); + void unregisterConnectors(); + std::shared_ptr veloxPlan_; std::shared_ptr veloxCfg_; bool debugModeEnabled_{false}; + std::unique_ptr executor_; + std::unique_ptr spillExecutor_; + std::unique_ptr ioExecutor_; + VeloxConnectorIds connectorIds_; std::unordered_map> emptySchemaBatchLoopUp_; }; diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 3c0505263159..1d1d09e0ef34 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -76,6 +76,9 @@ WholeStageResultIterator::WholeStageResultIterator( const std::vector& scanNodeIds, const std::vector>& scanInfos, const std::vector& streamIds, + folly::Executor* executor, + folly::Executor* spillExecutor, + VeloxConnectorIds connectorIds, const std::string spillDir, const std::shared_ptr& veloxCfg, const SparkTaskInfo& taskInfo) @@ -85,15 +88,14 @@ WholeStageResultIterator::WholeStageResultIterator( enableCudf_(veloxCfg_->get(kCudfEnabled, kCudfEnabledDefault)), #endif taskInfo_(taskInfo), + executor_(executor), veloxPlan_(planNode), + spillExecutor_(spillExecutor), + connectorIds_(std::move(connectorIds)), scanNodeIds_(scanNodeIds), scanInfos_(scanInfos), streamIds_(streamIds) { spillStrategy_ = veloxCfg_->get(kSpillStrategy, kSpillStrategyDefaultValue); - auto spillThreadNum = veloxCfg_->get(kSpillThreadNum, kSpillThreadNumDefaultValue); - if (spillThreadNum > 0) { - spillExecutor_ = std::make_shared(spillThreadNum); - } getOrderedNodeIds(veloxPlan_, orderedNodeIds_); auto fileSystem = velox::filesystems::getFileSystem(spillDir, nullptr); @@ -160,7 +162,7 @@ WholeStageResultIterator::WholeStageResultIterator( std::unordered_map customSplitInfo{{"table_format", "hive-iceberg"}}; auto deleteFiles = icebergSplitInfo->deleteFilesVec[idx]; split = std::make_shared( - kHiveConnectorId, + connectorIds_.hive, paths[idx], format, starts[idx], @@ -174,11 +176,11 @@ WholeStageResultIterator::WholeStageResultIterator( metadataColumn, properties[idx]); } else { - auto connectorId = kHiveConnectorId; + auto connectorId = connectorIds_.hive; #ifdef GLUTEN_ENABLE_GPU if (canUseCudfConnector && enableCudf_ && veloxCfg_->get(kCudfEnableTableScan, kCudfEnableTableScanDefault)) { - connectorId = kCudfHiveConnectorId; + connectorId = connectorIds_.cudfHive; } #endif split = std::make_shared( @@ -213,14 +215,21 @@ WholeStageResultIterator::WholeStageResultIterator( std::shared_ptr WholeStageResultIterator::createNewVeloxQueryCtx() { std::unordered_map> connectorConfigs; - connectorConfigs[kHiveConnectorId] = createHiveConnectorSessionConfig(veloxCfg_); + auto hiveSessionConfig = createHiveConnectorSessionConfig(veloxCfg_); + connectorConfigs[connectorIds_.hive] = hiveSessionConfig; + connectorConfigs[connectorIds_.iterator] = hiveSessionConfig; +#ifdef GLUTEN_ENABLE_GPU + if (!connectorIds_.cudfHive.empty()) { + connectorConfigs[connectorIds_.cudfHive] = hiveSessionConfig; + } +#endif std::shared_ptr ctx = velox::core::QueryCtx::create( - nullptr, + executor_, facebook::velox::core::QueryConfig{getQueryContextConf()}, connectorConfigs, gluten::VeloxBackend::get()->getAsyncDataCache(), memoryManager_->getAggregateMemoryPool(), - spillExecutor_.get(), + spillExecutor_, fmt::format( "Gluten_Stage_{}_TID_{}_VTID_{}", std::to_string(taskInfo_.stageId), @@ -365,7 +374,7 @@ void WholeStageResultIterator::addIteratorSplits(const std::vector( - kIteratorConnectorId, inputIterators[i]); + connectorIds_.iterator, inputIterators[i]); exec::Split split(folly::copy(connectorSplit), -1); task_->addSplit(streamIds_[i], std::move(split)); } diff --git a/cpp/velox/compute/WholeStageResultIterator.h b/cpp/velox/compute/WholeStageResultIterator.h index 9bb6ef8b11f7..033583922309 100644 --- a/cpp/velox/compute/WholeStageResultIterator.h +++ b/cpp/velox/compute/WholeStageResultIterator.h @@ -16,7 +16,9 @@ */ #pragma once +#include #include "compute/Runtime.h" +#include "compute/VeloxConnectorIds.h" #include "iceberg/IcebergPlanConverter.h" #include "memory/SplitAwareColumnarBatchIterator.h" #include "memory/VeloxColumnarBatch.h" @@ -41,6 +43,9 @@ class WholeStageResultIterator : public SplitAwareColumnarBatchIterator { const std::vector& scanNodeIds, const std::vector>& scanInfos, const std::vector& streamIds, + folly::Executor* executor, + folly::Executor* spillExecutor, + VeloxConnectorIds connectorIds, const std::string spillDir, const std::shared_ptr& veloxCfg, const SparkTaskInfo& taskInfo); @@ -126,12 +131,14 @@ class WholeStageResultIterator : public SplitAwareColumnarBatchIterator { const bool enableCudf_; #endif const SparkTaskInfo taskInfo_; + folly::Executor* executor_; std::shared_ptr task_; std::shared_ptr veloxPlan_; /// Spill. std::string spillStrategy_; - std::shared_ptr spillExecutor_ = nullptr; + folly::Executor* spillExecutor_ = nullptr; + VeloxConnectorIds connectorIds_; /// Metrics std::unique_ptr metrics_{}; diff --git a/cpp/velox/memory/VeloxMemoryManager.cc b/cpp/velox/memory/VeloxMemoryManager.cc index d829516e0dde..f628b8cdaede 100644 --- a/cpp/velox/memory/VeloxMemoryManager.cc +++ b/cpp/velox/memory/VeloxMemoryManager.cc @@ -31,8 +31,6 @@ #include "memory/ArrowMemoryPool.h" #include "utils/Exception.h" -DECLARE_int32(gluten_velox_async_timeout_on_task_stopping); - namespace gluten { using namespace facebook; @@ -443,26 +441,9 @@ bool VeloxMemoryManager::tryDestructSafe() { } VeloxMemoryManager::~VeloxMemoryManager() { - static const uint32_t kWaitTimeoutMs = FLAGS_gluten_velox_async_timeout_on_task_stopping; // 30s by default - uint32_t accumulatedWaitMs = 0UL; - bool destructed = false; - for (int32_t tryCount = 0; accumulatedWaitMs < kWaitTimeoutMs; tryCount++) { - destructed = tryDestructSafe(); - if (destructed) { - if (tryCount > 0) { - LOG(INFO) << "All the outstanding memory resources successfully released. "; - } - break; - } - uint32_t waitMs = 50 * static_cast(pow(1.5, tryCount)); // 50ms, 75ms, 112.5ms ... - LOG(INFO) << "There are still outstanding Velox memory allocations. Waiting for " << waitMs - << " ms to let possible async tasks done... "; - usleep(waitMs * 1000); - accumulatedWaitMs += waitMs; - } + bool destructed = tryDestructSafe(); if (!destructed) { - LOG(ERROR) << "Failed to release Velox memory manager after " << accumulatedWaitMs - << "ms as there are still outstanding memory resources. "; + LOG(ERROR) << "Failed to release Velox memory manager as there are still outstanding memory resources. "; } #ifdef ENABLE_JEMALLOC_STATS malloc_stats_print(nullptr, nullptr, nullptr); diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index e9a2417d92d6..78f95ff5b159 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -824,7 +824,7 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: const auto& compressionKind = writerOptions->compressionKind.value_or(common::CompressionKind::CompressionKind_SNAPPY); std::shared_ptr tableHandle = std::make_shared( - kHiveConnectorId, + connectorIds_.hive, makeHiveInsertTableHandle( tableColumnNames, /*inputType->names() clolumn name is different*/ inputType->children(), @@ -1356,7 +1356,7 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::constructValueStreamNode( // Create TableHandle bool dynamicFilterEnabled = veloxCfg_->get(kValueStreamDynamicFilterEnabled, kValueStreamDynamicFilterEnabledDefault); - auto tableHandle = std::make_shared(kIteratorConnectorId, dynamicFilterEnabled); + auto tableHandle = std::make_shared(connectorIds_.iterator, dynamicFilterEnabled); // Create column assignments connector::ColumnHandleMap assignments; @@ -1525,11 +1525,11 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: connector::ConnectorTableHandlePtr tableHandle; auto remainingFilter = readRel.has_filter() ? exprConverter_->toVeloxExpr(readRel.filter(), baseSchema) : nullptr; - auto connectorId = kHiveConnectorId; + auto connectorId = connectorIds_.hive; if (useCudfTableHandle(splitInfos_) && veloxCfg_->get(kCudfEnableTableScan, kCudfEnableTableScanDefault) && veloxCfg_->get(kCudfEnabled, kCudfEnabledDefault)) { #ifdef GLUTEN_ENABLE_GPU - connectorId = kCudfHiveConnectorId; + connectorId = connectorIds_.cudfHive; #endif } common::SubfieldFilters subfieldFilters; diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h b/cpp/velox/substrait/SubstraitToVeloxPlan.h index 47bf3a0525b1..373601916d4d 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.h +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h @@ -19,6 +19,7 @@ #include "SubstraitToVeloxExpr.h" #include "TypeUtils.h" +#include "compute/VeloxConnectorIds.h" #include "velox/connectors/hive/FileProperties.h" #include "velox/connectors/hive/TableHandle.h" #include "velox/core/PlanNode.h" @@ -80,12 +81,14 @@ class SubstraitToVeloxPlanConverter { memory::MemoryPool* pool, const facebook::velox::config::ConfigBase* veloxCfg, const std::vector>& inputIters, + VeloxConnectorIds connectorIds, const std::optional writeFilesTempPath = std::nullopt, const std::optional writeFileName = std::nullopt, bool validationMode = false) : pool_(pool), veloxCfg_(veloxCfg), inputIters_(inputIters), + connectorIds_(std::move(connectorIds)), writeFilesTempPath_(writeFilesTempPath), writeFileName_(writeFileName), validationMode_(validationMode) { @@ -308,6 +311,8 @@ class SubstraitToVeloxPlanConverter { /// Input row-vectors for query trace mode (ValuesNode / cuDF ValueStream support) std::vector> inputIters_; + VeloxConnectorIds connectorIds_; + /// The temporary path used to write files. std::optional writeFilesTempPath_; std::optional writeFileName_; diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h index 8afc7c5bf8b2..7f883e918c18 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h +++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h @@ -19,6 +19,8 @@ #include #include "SubstraitToVeloxPlan.h" +#include "config/VeloxConfig.h" +#include "operators/plannodes/IteratorSplit.h" #include "velox/core/QueryCtx.h" using namespace facebook; @@ -34,7 +36,13 @@ class SubstraitToVeloxPlanValidator { {velox::core::QueryConfig::kSparkPartitionId, "0"}, {velox::core::QueryConfig::kSessionTimezone, "GMT"}}; veloxCfg_ = std::make_shared(std::move(configs)); planConverter_ = std::make_unique( - pool, veloxCfg_.get(), std::vector>{}, std::nullopt, std::nullopt, true); + pool, + veloxCfg_.get(), + std::vector>{}, + VeloxConnectorIds{.hive = kHiveConnectorId, .iterator = kIteratorConnectorId, .cudfHive = kCudfHiveConnectorId}, + std::nullopt, + std::nullopt, + true); queryCtx_ = velox::core::QueryCtx::create(nullptr, velox::core::QueryConfig(veloxCfg_->rawConfigs())); // An execution context used for function validation. execCtx_ = std::make_unique(pool, queryCtx_.get()); diff --git a/docs/velox-configuration.md b/docs/velox-configuration.md index 859c6356c17d..5a1375f392e4 100644 --- a/docs/velox-configuration.md +++ b/docs/velox-configuration.md @@ -15,7 +15,7 @@ nav_order: 16 | spark.gluten.sql.columnar.backend.velox.SplitPreloadPerDriver | 2 | The split preload per task | | spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct | 90 | If partial aggregation aggregationPct greater than this value, partial aggregation may be early abandoned. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. | | spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows | 100000 | If partial aggregation input rows number greater than this value, partial aggregation may be early abandoned. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. | -| spark.gluten.sql.columnar.backend.velox.asyncTimeoutOnTaskStopping | 30000ms | Timeout for asynchronous execution when task is being stopped in Velox backend. It's recommended to set to a number larger than network connection timeout that the possible aysnc tasks are relying on. | +| spark.gluten.sql.columnar.backend.velox.asyncTimeoutOnTaskStopping | 30000ms | Timeout in milliseconds when waiting for runtime-scoped async work to finish during teardown. | | spark.gluten.sql.columnar.backend.velox.broadcastHashTableBuildThreads | 1 | The number of threads used to build the broadcast hash table. If not set or set to 0, it will use the default number of threads (available processors). | | spark.gluten.sql.columnar.backend.velox.cacheEnabled | false | Enable Velox cache, default off. It's recommended to enablesoft-affinity as well when enable velox cache. | | spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct | 0 | Set prefetch cache min pct for velox file scan | @@ -86,4 +86,3 @@ nav_order: 16 | spark.gluten.velox.abandonDedupHashMap.minPct | 0 | Experimental: abandon hashmap build if duplicated rows are more than this percentile. Value is integer based and range is [0, 100]. | | spark.gluten.velox.abandonDedupHashMap.minRows | 100000 | Experimental: abandon hashmap build if duplicated rows more than this number. | | spark.gluten.velox.offHeapBroadcastBuildRelation.enabled | false | Experimental: If enabled, broadcast build relation will use offheap memory. Otherwise, broadcast build relation will use onheap memory. | -