Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,21 @@ public class CatalogContext implements Serializable {
private final SerializableConfiguration hadoopConf;
@Nullable private final FileIOLoader preferIOLoader;
@Nullable private final FileIOLoader fallbackIOLoader;
@Nullable private final String catalogName;

private CatalogContext(
Options options,
@Nullable Configuration hadoopConf,
@Nullable FileIOLoader preferIOLoader,
@Nullable FileIOLoader fallbackIOLoader) {
@Nullable FileIOLoader fallbackIOLoader,
@Nullable String catalogName) {
this.options = checkNotNull(options);
this.hadoopConf =
new SerializableConfiguration(
hadoopConf == null ? getHadoopConfiguration(options) : hadoopConf);
this.preferIOLoader = preferIOLoader;
this.fallbackIOLoader = fallbackIOLoader;
this.catalogName = catalogName;
}

public static CatalogContext create(Path warehouse) {
Expand All @@ -69,28 +72,43 @@ public static CatalogContext create(Path warehouse) {
}

public static CatalogContext create(Options options) {
return new CatalogContext(options, null, null, null);
return new CatalogContext(options, null, null, null, null);
}

public static CatalogContext create(Options options, Configuration hadoopConf) {
return new CatalogContext(options, hadoopConf, null, null);
return new CatalogContext(options, hadoopConf, null, null, null);
}

public static CatalogContext create(
Options options, Configuration hadoopConf, @Nullable String catalogName) {
return new CatalogContext(options, hadoopConf, null, null, catalogName);
}

public static CatalogContext create(Options options, FileIOLoader fallbackIOLoader) {
return new CatalogContext(options, null, null, fallbackIOLoader);
return new CatalogContext(options, null, null, fallbackIOLoader, null);
}

public static CatalogContext create(
Options options, FileIOLoader preferIOLoader, FileIOLoader fallbackIOLoader) {
return new CatalogContext(options, null, preferIOLoader, fallbackIOLoader);
return new CatalogContext(options, null, preferIOLoader, fallbackIOLoader, null);
}

public static CatalogContext create(
Options options,
FileIOLoader preferIOLoader,
FileIOLoader fallbackIOLoader,
@Nullable String catalogName) {
return new CatalogContext(options, null, preferIOLoader, fallbackIOLoader, catalogName);
}

public static CatalogContext create(
Options options,
Configuration hadoopConf,
FileIOLoader preferIOLoader,
FileIOLoader fallbackIOLoader) {
return new CatalogContext(options, hadoopConf, preferIOLoader, fallbackIOLoader);
FileIOLoader fallbackIOLoader,
@Nullable String catalogName) {
return new CatalogContext(
options, hadoopConf, preferIOLoader, fallbackIOLoader, catalogName);
}

public Options options() {
Expand All @@ -111,4 +129,9 @@ public FileIOLoader preferIO() {
public FileIOLoader fallbackIO() {
return fallbackIOLoader;
}

@Nullable
public String catalogName() {
return catalogName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ public void configure(CatalogContext context) {
options.set(RESOLVING_FILE_IO_ENABLED, false);
this.context =
CatalogContext.create(
options, context.hadoopConf(), context.preferIO(), context.fallbackIO());
options,
context.hadoopConf(),
context.preferIO(),
context.fallbackIO(),
context.catalogName());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ public FileIO fileIO() throws IOException {
options,
catalogContext.hadoopConf(),
catalogContext.preferIO(),
catalogContext.fallbackIO());
catalogContext.fallbackIO(),
catalogContext.catalogName());
try {
fileIO = FileIO.get(path, context);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ protected AbstractCatalog(FileIO fileIO, CatalogContext context) {
this.context = context;
}

@Nullable
@Override
public String name() {
return context.catalogName();
}

@Override
public Map<String, String> options() {
return context.options().toMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,10 @@ TableQueryAuthResult authTableQuery(Identifier identifier, @Nullable List<String

// ==================== Catalog Information ==========================

/** The name of this catalog. */
@Nullable
String name();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just store this name to SparkTable?


/** Catalog options for re-creating this catalog. */
Map<String, String> options();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ public static Table loadTable(

CatalogEnvironment catalogEnv =
new CatalogEnvironment(
catalog.name(),
tableIdentifier,
metadata.uuid(),
isRestCatalog && metadata.isExternal() ? null : catalog.catalogLoader(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ public Catalog wrapped() {
return wrapped;
}

@Nullable
@Override
public String name() {
return wrapped.name();
}

@Override
public boolean caseSensitive() {
return wrapped.caseSensitive();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,17 @@ public RESTCatalog(CatalogContext context, boolean configRequired) {
api.options(),
context.hadoopConf(),
context.preferIO(),
context.fallbackIO());
context.fallbackIO(),
context.catalogName());
this.dataTokenEnabled = api.options().get(RESTTokenFileIO.DATA_TOKEN_ENABLED);
this.tableDefaultOptions = CatalogUtils.tableDefaultOptions(this.context.options().toMap());
}

@Override
public String name() {
return context.catalogName();
}

@Override
public Map<String, String> options() {
return context.options().toMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@
/** Catalog environment in table which contains log factory, metastore client factory. */
public class CatalogEnvironment implements Serializable {

private static final long serialVersionUID = 2L;
private static final long serialVersionUID = 3L;

@Nullable private final String catalogName;
@Nullable private final Identifier identifier;
@Nullable private final String uuid;
@Nullable private final CatalogLoader catalogLoader;
Expand All @@ -55,6 +56,7 @@ public class CatalogEnvironment implements Serializable {
private final boolean supportsPartitionModification;

public CatalogEnvironment(
@Nullable String catalogName,
@Nullable Identifier identifier,
@Nullable String uuid,
@Nullable CatalogLoader catalogLoader,
Expand All @@ -63,6 +65,7 @@ public CatalogEnvironment(
@Nullable CatalogContext catalogContext,
boolean supportsVersionManagement,
boolean supportsPartitionModification) {
this.catalogName = catalogName;
this.identifier = identifier;
this.uuid = uuid;
this.catalogLoader = catalogLoader;
Expand All @@ -74,7 +77,12 @@ public CatalogEnvironment(
}

public static CatalogEnvironment empty() {
return new CatalogEnvironment(null, null, null, null, null, null, false, false);
return new CatalogEnvironment(null, null, null, null, null, null, null, false, false);
}

@Nullable
public String catalogName() {
return catalogName;
}

@Nullable
Expand Down Expand Up @@ -173,6 +181,7 @@ public CatalogContext catalogContext() {

public CatalogEnvironment copy(Identifier identifier) {
return new CatalogEnvironment(
catalogName,
identifier,
uuid,
catalogLoader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void testContextSerializable() throws IOException, ClassNotFoundException
conf.set("my_key", "my_value");
CatalogContext context =
CatalogContext.create(
new Options(), conf, new TestFileIOLoader(), new TestFileIOLoader());
new Options(), conf, new TestFileIOLoader(), new TestFileIOLoader(), null);
context = InstantiationUtil.clone(context);
assertThat(context.hadoopConf().get("my_key")).isEqualTo(conf.get("my_key"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void close() throws Exception {}
};

CatalogEnvironment env =
new CatalogEnvironment(null, null, null, null, null, null, false, false) {
new CatalogEnvironment(null, null, null, null, null, null, null, false, false) {

@Override
public PartitionModification partitionModification() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2663,6 +2663,7 @@ private FileStoreTable getFileTable(Identifier identifier) {
TableSchema schema = tableMetadata.schema();
CatalogEnvironment catalogEnv =
new CatalogEnvironment(
catalog.name(),
identifier,
tableMetadata.uuid(),
catalog.catalogLoader(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ public FlinkCatalog createCatalog(Context context) {
return createCatalog(
context.getName(),
CatalogContext.create(
Options.fromMap(context.getOptions()), new FlinkFileIOLoader()),
Options.fromMap(context.getOptions()),
new FlinkFileIOLoader(),
null,
context.getName()),
context.getClassLoader());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ public static FlinkGenericCatalog createCatalog(
FlinkCatalog paimon =
new FlinkCatalog(
org.apache.paimon.catalog.CatalogFactory.createCatalog(
CatalogContext.create(options, new FlinkFileIOLoader()), cl),
CatalogContext.create(options, new FlinkFileIOLoader(), null, name),
cl),
name,
options.get(DEFAULT_DATABASE),
options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ public void initialize(String name, CaseInsensitiveStringMap options) {
CatalogContext catalogContext =
CatalogContext.create(
Options.fromMap(options.asCaseSensitiveMap()),
sparkSession.sessionState().newHadoopConf());
sparkSession.sessionState().newHadoopConf(),
name);
this.catalog = CatalogFactory.createCatalog(catalogContext);
this.defaultDatabase =
options.getOrDefault(DEFAULT_DATABASE.key(), DEFAULT_DATABASE.defaultValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.paimon.spark

import org.apache.paimon.table.Table
import org.apache.paimon.table.{FileStoreTable, Table}
import org.apache.paimon.utils.StringUtils

import org.apache.spark.sql.connector.catalog.TableCapability
Expand All @@ -37,7 +37,7 @@ abstract class BaseTable

override def capabilities(): JSet[TableCapability] = JCollections.emptySet[TableCapability]()

override def name: String = table.fullName
override def name: String = BaseTable.tableNameWithCatalog(table)

override lazy val schema: StructType = SparkTypeUtils.fromPaimonRowType(table.rowType)

Expand All @@ -48,6 +48,20 @@ abstract class BaseTable
override def properties: JMap[String, String] = table.options()

override def toString: String = {
s"${table.getClass.getSimpleName}[${table.fullName()}]"
s"${table.getClass.getSimpleName}[$name]"
}
}

object BaseTable {

/** Returns the full table name with catalog prefix if available. */
def tableNameWithCatalog(table: Table): String = {
val fullName = table.fullName
table match {
case t: FileStoreTable =>
Option(t.catalogEnvironment().catalogName())
.fold(fullName)(catalog => s"$catalog.$fullName")
case _ => fullName
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.spark.execution

import org.apache.paimon.partition.PartitionPredicate
import org.apache.paimon.spark.BaseTable
import org.apache.paimon.spark.leafnode.PaimonLeafV2CommandExec
import org.apache.paimon.table.{FileStoreTable, Table}
import org.apache.paimon.utils.InternalRowPartitionComputer
Expand Down Expand Up @@ -69,7 +70,7 @@ case class TruncatePaimonTableWithFilterExec(
override def output: Seq[Attribute] = Nil

override def simpleString(maxFields: Int): String = {
s"TruncatePaimonTableWithFilterExec: ${table.fullName()}" +
s"TruncatePaimonTableWithFilterExec: ${BaseTable.tableNameWithCatalog(table)}" +
partitionPredicate.map(p => s", PartitionPredicate: [$p]").getOrElse("")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions
import org.apache.paimon.partition.PartitionPredicate
import org.apache.paimon.predicate.{Predicate, TopN, VectorSearch}
import org.apache.paimon.spark.{PaimonBatch, PaimonInputPartition, PaimonNumSplitMetric, PaimonPartitionSizeMetric, PaimonReadBatchTimeMetric, PaimonResultedTableFilesMetric, PaimonResultedTableFilesTaskMetric, SparkTypeUtils}
import org.apache.paimon.spark.BaseTable
import org.apache.paimon.spark.schema.PaimonMetadataColumn
import org.apache.paimon.spark.schema.PaimonMetadataColumn._
import org.apache.paimon.spark.util.{OptionUtils, SplitUtils}
Expand Down Expand Up @@ -182,7 +183,7 @@ trait BaseScan extends Scan with SupportsReportStatistics with Logging {
} else {
""
}
s"${getClass.getSimpleName}: [${table.name}]" +
s"${getClass.getSimpleName}: [${BaseTable.tableNameWithCatalog(table)}]" +
pushedPartitionFiltersStr +
pushedRuntimePartitionFiltersStr +
pushedDataFiltersStr +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.spark.read

import org.apache.paimon.partition.PartitionPredicate
import org.apache.paimon.spark.BaseTable
import org.apache.paimon.table.Table

import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -39,6 +40,6 @@ case class PaimonLocalScan(
} else {
""
}
s"PaimonLocalScan: [${table.name}]" + pushedPartitionFiltersStr
s"PaimonLocalScan: [${BaseTable.tableNameWithCatalog(table)}]" + pushedPartitionFiltersStr
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class PaimonV2Write(
case Some(_) if !overwriteDynamic => ", overwriteTable=true"
case _ => ""
}
s"PaimonWrite(table=${table.fullName()}$overwriteDynamicStr$overwritePartitionsStr)"
s"PaimonWrite(table=${BaseTable.tableNameWithCatalog(table)}$overwriteDynamicStr$overwritePartitionsStr)"
}

override def description(): String = toString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.paimon.spark.write

import org.apache.paimon.options.Options
import org.apache.paimon.spark.SaveMode
import org.apache.paimon.spark.{BaseTable, SaveMode}
import org.apache.paimon.spark.commands.WriteIntoPaimonTable
import org.apache.paimon.table.FileStoreTable

Expand All @@ -38,6 +38,6 @@ class PaimonWrite(val table: FileStoreTable, saveMode: SaveMode, options: Option
}

override def toString: String = {
s"table: ${table.fullName()}, saveMode: $saveMode, options: ${options.toMap}"
s"table: ${BaseTable.tableNameWithCatalog(table)}, saveMode: $saveMode, options: ${options.toMap}"
}
}
Loading