diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java index c67ddd7ea372..5639da0871fc 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java @@ -22,6 +22,7 @@ import org.apache.paimon.flink.sink.Committable; import org.apache.paimon.flink.sink.CommittableTypeInfo; import org.apache.paimon.flink.utils.BoundedOneInputOperator; +import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils; import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataIncrement; @@ -108,7 +109,8 @@ public DataStream buildDataStream() throws Exception { List binaryPartitions = fileStoreTable.newScan().listPartitions(); SingleOutputStreamOperator source = - env.fromData( + StreamExecutionEnvironmentUtils.fromData( + env, binaryPartitions.stream() .map(BinaryRow::toBytes) .collect(Collectors.toList()), diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java index 35830faac19b..dbdfdc8fe06e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java @@ -34,6 +34,7 @@ import org.apache.paimon.flink.sorter.TableSorter; import org.apache.paimon.flink.utils.BoundedOneInputOperator; import org.apache.paimon.flink.utils.JavaTypeInfo; +import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils; import org.apache.paimon.globalindex.GlobalIndexParallelWriter; import org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder; import org.apache.paimon.globalindex.btree.BTreeIndexOptions; @@ -211,7 +212,10 @@ protected static DataStream executeForPartitionRange( parallelism = Math.min(parallelism, maxParallelism); DataStream sourceStream = - env.fromData(new JavaTypeInfo<>(Split.class), rangeSplits.toArray(new Split[0])) + StreamExecutionEnvironmentUtils.fromData( + env, + new JavaTypeInfo<>(Split.class), + rangeSplits.toArray(new Split[0])) .name("Global Index Source " + " range=" + range) .setParallelism(1); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java index 865280763c6e..bcbabac793a2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java @@ -28,6 +28,7 @@ import org.apache.paimon.flink.sink.StoreCommitter; import org.apache.paimon.flink.utils.BoundedOneInputOperator; import org.apache.paimon.flink.utils.JavaTypeInfo; +import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils; import org.apache.paimon.globalindex.GlobalIndexSingletonWriter; import org.apache.paimon.globalindex.ResultEntry; import org.apache.paimon.index.IndexFileMeta; @@ -179,7 +180,8 @@ public static boolean buildIndex( ReadBuilder readBuilder = table.newReadBuilder().withReadType(projectedRowType); DataStream source = - env.fromData( + StreamExecutionEnvironmentUtils.fromData( + env, new JavaTypeInfo<>(ShardTask.class), shardTasks.toArray(new ShardTask[0])) .name("Generic Index Source") @@ -201,7 +203,8 @@ public static boolean buildIndex( if (!deletedIndexEntries.isEmpty()) { List deleteCommittables = createDeleteCommittables(deletedIndexEntries); DataStream deletes = - env.fromData( + StreamExecutionEnvironmentUtils.fromData( + env, new CommittableTypeInfo(), deleteCommittables.toArray(new Committable[0])) .name("Index Delete Source") diff --git a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/utils/StreamExecutionEnvironmentUtils.java b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/utils/StreamExecutionEnvironmentUtils.java new file mode 100644 index 000000000000..c602bd316263 --- /dev/null +++ b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/utils/StreamExecutionEnvironmentUtils.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.utils; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.Arrays; +import java.util.Collection; + +/** Utility methods about {@link StreamExecutionEnvironment} to resolve compatibility issues. */ +public class StreamExecutionEnvironmentUtils { + + @SafeVarargs + public static DataStreamSource fromData( + StreamExecutionEnvironment env, TypeInformation typeInfo, T... data) { + return env.fromCollection(Arrays.asList(data), typeInfo); + } + + public static DataStreamSource fromData( + StreamExecutionEnvironment env, Collection data, TypeInformation typeInfo) { + return env.fromCollection(data, typeInfo); + } +} diff --git a/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/utils/StreamExecutionEnvironmentUtils.java b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/utils/StreamExecutionEnvironmentUtils.java new file mode 100644 index 000000000000..1fb5edf496a3 --- /dev/null +++ b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/utils/StreamExecutionEnvironmentUtils.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.utils; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.Collection; + +/** Utility methods about {@link StreamExecutionEnvironment} to resolve compatibility issues. */ +public class StreamExecutionEnvironmentUtils { + + @SafeVarargs + public static DataStreamSource fromData( + StreamExecutionEnvironment env, TypeInformation typeInfo, T... data) { + return env.fromData(typeInfo, data); + } + + public static DataStreamSource fromData( + StreamExecutionEnvironment env, Collection data, TypeInformation typeInfo) { + return env.fromData(data, typeInfo); + } +}