Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1910,6 +1910,14 @@ class MiscOperatorSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa
assert(plan2.find(_.isInstanceOf[ProjectExecTransformer]).isDefined)
}

test("cast date to timestamp with GMT session timezone") {
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "GMT") {
runQueryAndCompare("SELECT cast(date'2023-01-02 01:01:01' as timestamp) as ts") {
checkGlutenPlan[ProjectExecTransformer]
}
}
}

test("cast timestamp to date") {
val query = "select cast(ts as date) from values (timestamp'2024-01-01 00:00:00') as tab(ts)"
runQueryAndCompare(query) {
Expand Down
11 changes: 11 additions & 0 deletions cpp/core/config/GlutenConfig.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,17 @@ parseConfMap(JNIEnv* env, const uint8_t* planData, const int32_t planDataLength)
return sparkConfs;
}

std::string normalizeSessionTimezone(std::string_view sessionTimezone) {
if (sessionTimezone == "GMT") {
return "UTC";
}
if (sessionTimezone.size() > 3 && sessionTimezone.substr(0, 3) == "GMT" &&
(sessionTimezone[3] == '+' || sessionTimezone[3] == '-')) {
return std::string("UTC").append(sessionTimezone.substr(3));
}
return std::string(sessionTimezone);
}

std::string printConfig(const std::unordered_map<std::string, std::string>& conf) {
std::ostringstream oss;
oss << std::endl;
Expand Down
3 changes: 3 additions & 0 deletions cpp/core/config/GlutenConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <jni.h>
#include <cstdint>
#include <string>
#include <string_view>
#include <unordered_map>

namespace gluten {
Expand Down Expand Up @@ -102,5 +103,7 @@ const std::string kDebugCudfDefault = "false";
std::unordered_map<std::string, std::string>
parseConfMap(JNIEnv* env, const uint8_t* planData, const int32_t planDataLength);

std::string normalizeSessionTimezone(std::string_view sessionTimezone);

std::string printConfig(const std::unordered_map<std::string, std::string>& conf);
} // namespace gluten
15 changes: 8 additions & 7 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,13 @@
#include "velox/exec/PlanNodeStats.h"
#ifdef GLUTEN_ENABLE_GPU
#include <cudf/io/types.hpp>
#include "cudf/GpuLock.h"
#include "velox/experimental/cudf/CudfConfig.h"
#include "velox/experimental/cudf/connectors/hive/CudfHiveConnectorSplit.h"
#include "velox/experimental/cudf/exec/ToCudf.h"
#include "cudf/GpuLock.h"
#endif
#include "operators/plannodes/RowVectorStream.h"


using namespace facebook;

namespace gluten {
Expand Down Expand Up @@ -358,14 +357,15 @@ void WholeStageResultIterator::constructPartitionColumns(
}

void WholeStageResultIterator::addIteratorSplits(const std::vector<std::shared_ptr<ResultIterator>>& inputIterators) {
GLUTEN_CHECK(!allSplitsAdded_, "Method addIteratorSplits should not be called since all splits has been added to the Velox task.");
GLUTEN_CHECK(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove unrelated changes

!allSplitsAdded_,
"Method addIteratorSplits should not be called since all splits has been added to the Velox task.");
// Create IteratorConnectorSplit for each iterator
for (size_t i = 0; i < streamIds_.size() && i < inputIterators.size(); ++i) {
if (inputIterators[i] == nullptr) {
continue;
}
auto connectorSplit = std::make_shared<IteratorConnectorSplit>(
kIteratorConnectorId, inputIterators[i]);
auto connectorSplit = std::make_shared<IteratorConnectorSplit>(kIteratorConnectorId, inputIterators[i]);
exec::Split split(folly::copy(connectorSplit), -1);
task_->addSplit(streamIds_[i], std::move(split));
}
Expand All @@ -385,7 +385,7 @@ void WholeStageResultIterator::noMoreSplits() {
for (const auto& scanNodeId : scanNodeIds_) {
task_->noMoreSplits(scanNodeId);
}

// Mark no more splits for all stream nodes
for (const auto& streamId : streamIds_) {
task_->noMoreSplits(streamId);
Expand Down Expand Up @@ -575,7 +575,8 @@ std::unordered_map<std::string, std::string> WholeStageResultIterator::getQueryC
std::to_string(veloxCfg_->get<uint64_t>(kVeloxPreferredBatchBytes, 10L << 20));
try {
configs[velox::core::QueryConfig::kSparkAnsiEnabled] = veloxCfg_->get<std::string>(kAnsiEnabled, "false");
configs[velox::core::QueryConfig::kSessionTimezone] = veloxCfg_->get<std::string>(kSessionTimezone, "");
configs[velox::core::QueryConfig::kSessionTimezone] =
normalizeSessionTimezone(veloxCfg_->get<std::string>(kSessionTimezone, ""));
// Adjust timestamp according to the above configured session timezone.
configs[velox::core::QueryConfig::kAdjustTimestampToTimezone] = "true";

Expand Down
9 changes: 5 additions & 4 deletions cpp/velox/jni/VeloxJniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeValidateWithFail
}

const auto pool = defaultLeafVeloxMemoryPool().get();
SubstraitToVeloxPlanValidator planValidator(pool);
SubstraitToVeloxPlanValidator planValidator(pool, ctx->getConfMap());
::substrait::Plan subPlan;
parseProtobuf(planData, planSize, &subPlan);

Expand Down Expand Up @@ -226,8 +226,9 @@ JNIEXPORT jboolean JNICALL Java_org_apache_gluten_vectorized_PlanEvaluatorJniWra
env->DeleteLocalRef(mapping);
}

const auto ctx = getRuntime(env, wrapper);
auto pool = defaultLeafVeloxMemoryPool().get();
SubstraitToVeloxPlanValidator planValidator(pool);
SubstraitToVeloxPlanValidator planValidator(pool, ctx->getConfMap());
auto inputType = SubstraitParser::parseType(inputSubstraitType);
if (inputType->kind() != TypeKind::ROW) {
throw GlutenException("Input type is not a RowType.");
Expand Down Expand Up @@ -461,8 +462,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_utils_GpuBufferBatchResizerJniWra
auto arrowPool = dynamic_cast<VeloxMemoryManager*>(ctx->memoryManager())->defaultArrowMemoryPool();
auto pool = dynamic_cast<VeloxMemoryManager*>(ctx->memoryManager())->getLeafMemoryPool();
auto iter = makeJniColumnarBatchIterator(env, jIter, ctx);
auto appender = std::make_shared<ResultIterator>(
std::make_unique<GpuBufferBatchResizer>(arrowPool, pool.get(), minOutputBatchSize, maxPrefetchBatchBytes, std::move(iter)));
auto appender = std::make_shared<ResultIterator>(std::make_unique<GpuBufferBatchResizer>(
arrowPool, pool.get(), minOutputBatchSize, maxPrefetchBatchBytes, std::move(iter)));
return ctx->saveObject(appender);
JNI_METHOD_END(kInvalidObjectHandle)
}
Expand Down
11 changes: 9 additions & 2 deletions cpp/velox/substrait/SubstraitToVeloxPlanValidator.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <unordered_map>
#include "SubstraitToVeloxPlan.h"
#include "config/GlutenConfig.h"
#include "velox/core/QueryCtx.h"

using namespace facebook;
Expand All @@ -29,9 +30,15 @@ namespace gluten {
/// a Substrait plan is supported in Velox.
class SubstraitToVeloxPlanValidator {
public:
SubstraitToVeloxPlanValidator(memory::MemoryPool* pool) {
explicit SubstraitToVeloxPlanValidator(
memory::MemoryPool* pool,
const std::unordered_map<std::string, std::string>& confMap = {}) {
const auto it = confMap.find(kSessionTimezone);
const auto sessionTimezone =
normalizeSessionTimezone(it == confMap.end() ? std::string_view("UTC") : std::string_view(it->second));
std::unordered_map<std::string, std::string> configs{
{velox::core::QueryConfig::kSparkPartitionId, "0"}, {velox::core::QueryConfig::kSessionTimezone, "GMT"}};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update the GMT to other value is enough

{velox::core::QueryConfig::kSparkPartitionId, "0"},
{velox::core::QueryConfig::kSessionTimezone, sessionTimezone}};
veloxCfg_ = std::make_shared<facebook::velox::config::ConfigBase>(std::move(configs));
planConverter_ = std::make_unique<SubstraitToVeloxPlanConverter>(
pool, veloxCfg_.get(), std::vector<std::shared_ptr<ResultIterator>>{}, std::nullopt, std::nullopt, true);
Expand Down
4 changes: 3 additions & 1 deletion cpp/velox/utils/VeloxWriterUtils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ std::unique_ptr<WriterOptions> makeParquetWriteOption(const std::unordered_map<s
writeOption->flushPolicyFactory = [maxRowGroupRows, maxRowGroupBytes]() {
return std::make_unique<LambdaFlushPolicy>(maxRowGroupRows, maxRowGroupBytes, [&]() { return false; });
};
writeOption->parquetWriteTimestampTimeZone = getConfigValue(sparkConfs, kSessionTimezone, std::nullopt);
if (auto it = sparkConfs.find(kSessionTimezone); it != sparkConfs.end()) {
writeOption->parquetWriteTimestampTimeZone = normalizeSessionTimezone(it->second);
}
writeOption->arrowMemoryPool =
getDefaultMemoryManager()->getOrCreateArrowMemoryPool("VeloxParquetWrite.ArrowMemoryPool");
if (auto it = sparkConfs.find(kParquetDataPageSize); it != sparkConfs.end()) {
Expand Down
Loading