-
Notifications
You must be signed in to change notification settings - Fork 208
[ARUON #1994] Support HiveTableScanExec to native #1995
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,53 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.hive.execution.auron.plan | ||
|
|
||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.sql.auron.AuronConverters.getBooleanConf | ||
| import org.apache.spark.sql.auron.{AuronConvertProvider, AuronConverters} | ||
| import org.apache.spark.sql.execution.SparkPlan | ||
| import org.apache.spark.sql.hive.execution.HiveTableScanExec | ||
|
|
||
| class HiveConvertProvider extends AuronConvertProvider with Logging { | ||
| override def isEnabled: Boolean = | ||
| getBooleanConf("spark.auron.enable.hiveTable", defaultValue = true) | ||
|
|
||
| def enableHiveTableScanExec: Boolean = | ||
| getBooleanConf("spark.auron.enable.hiveTableScanExec", defaultValue = false) | ||
|
Comment on lines
+27
to
+31
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we be adding these to @SparkAuronConfiguration.java? |
||
|
|
||
|
Comment on lines
+26
to
+32
|
||
| override def isSupported(exec: SparkPlan): Boolean = | ||
| exec match { | ||
| case e: HiveTableScanExec if enableHiveTableScanExec && | ||
| e.relation.tableMeta.provider.isDefined && | ||
| e.relation.tableMeta.provider.get.equals("hive") => | ||
| true | ||
|
Comment on lines
+33
to
+38
|
||
| case _ => false | ||
| } | ||
|
Comment on lines
+33
to
+40
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Slightly hard to distinguish when to use |
||
|
|
||
| override def convert(exec: SparkPlan): SparkPlan = { | ||
| exec match { | ||
| case hiveExec: HiveTableScanExec if enableHiveTableScanExec => | ||
| convertHiveTableScanExec(hiveExec) | ||
| case _ => exec | ||
| } | ||
| } | ||
|
|
||
| def convertHiveTableScanExec(hiveExec: HiveTableScanExec): SparkPlan = { | ||
| AuronConverters.addRenameColumnsExec(NativeHiveTableScanExec(hiveExec)) | ||
| } | ||
|
Comment on lines
+50
to
+52
|
||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,271 @@ | ||||||||||||||||||||||||||||||||||||||||||||
| /* | ||||||||||||||||||||||||||||||||||||||||||||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||||||||||||||||||||||||||||||||||||||||||||
| * contributor license agreements. See the NOTICE file distributed with | ||||||||||||||||||||||||||||||||||||||||||||
| * this work for additional information regarding copyright ownership. | ||||||||||||||||||||||||||||||||||||||||||||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||||||||||||||||||||||||||||||||||||||||||||
| * (the "License"); you may not use this file except in compliance with | ||||||||||||||||||||||||||||||||||||||||||||
| * the License. You may obtain a copy of the License at | ||||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||||
| * Unless required by applicable law or agreed to in writing, software | ||||||||||||||||||||||||||||||||||||||||||||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||||||||||||||||||||||||||||||||||||||||||||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||||||||||||||||||||||||||||||||||||||||
| * See the License for the specific language governing permissions and | ||||||||||||||||||||||||||||||||||||||||||||
| * limitations under the License. | ||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||
| package org.apache.spark.sql.hive.execution.auron.plan | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.auron.metric.SparkMetricNode | ||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.auron.{protobuf => pb} | ||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+17
to
+20
|
||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.hadoop.conf.Configurable | ||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.hadoop.hive.ql.exec.Utilities | ||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat | ||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat | ||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable} | ||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.hadoop.hive.ql.plan.TableDesc | ||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.hadoop.hive.serde.serdeConstants | ||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption | ||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, StructObjectInspector} | ||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils | ||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.hadoop.io.Writable | ||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.hadoop.mapred.{FileSplit, InputFormat, JobConf} | ||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.hadoop.mapreduce.{InputFormat => newInputClass} | ||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.hadoop.util.ReflectionUtils | ||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.spark.internal.Logging | ||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.spark.internal.config.HADOOP_RDD_IGNORE_EMPTY_SPLITS | ||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.spark.sql.SparkSession | ||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.spark.sql.auron.{NativeRDD, Shims} | ||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.spark.sql.catalyst.expressions.{AttributeMap, GenericInternalRow} | ||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} | ||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.spark.sql.hive.client.HiveClientImpl | ||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.spark.sql.hive.execution.HiveTableScanExec | ||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.spark.sql.hive.{HadoopTableReader, HiveShim} | ||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.spark.{Partition, TaskContext} | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| import java.util.UUID | ||||||||||||||||||||||||||||||||||||||||||||
| import scala.collection.JavaConverters._ | ||||||||||||||||||||||||||||||||||||||||||||
| import scala.collection.mutable.ArrayBuffer | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| case class NativeHiveTableScanExec(basedHiveScan: HiveTableScanExec) | ||||||||||||||||||||||||||||||||||||||||||||
| extends NativeHiveTableScanBase(basedHiveScan) | ||||||||||||||||||||||||||||||||||||||||||||
| with Logging { | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| @transient private lazy val nativeTable: HiveTable = HiveClientImpl.toHiveTable(relation.tableMeta) | ||||||||||||||||||||||||||||||||||||||||||||
| @transient private lazy val fileFormat = HiveTableUtil.getFileFormat(nativeTable.getInputFormatClass) | ||||||||||||||||||||||||||||||||||||||||||||
| @transient private lazy val nativeTableDesc = new TableDesc( | ||||||||||||||||||||||||||||||||||||||||||||
| nativeTable.getInputFormatClass, | ||||||||||||||||||||||||||||||||||||||||||||
| nativeTable.getOutputFormatClass, | ||||||||||||||||||||||||||||||||||||||||||||
| nativeTable.getMetadata) | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| @transient private lazy val nativeHadoopConf = { | ||||||||||||||||||||||||||||||||||||||||||||
| val hiveConf = SparkSession.getActiveSession.get.sessionState.newHadoopConf() | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
| val hiveConf = SparkSession.getActiveSession.get.sessionState.newHadoopConf() | |
| val sparkSession = Shims.get.getSqlContext(basedHiveScan).sparkSession | |
| val hiveConf = sparkSession.sessionState.newHadoopConf() |
Copilot
AI
Feb 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minPartitions reads SparkSession.getActiveSession.get.sparkContext multiple times. Besides the .get risk, it’s also inconsistent with other native scan implementations in this repo that pass an explicit sparkSession around. Prefer using a single sparkSession resolved from basedHiveScan and derive sparkContext from it.
Copilot
AI
Feb 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ignoreEmptySplits also depends on SparkSession.getActiveSession.get. This should use the same non-optional session/context resolution as the rest of the execution code to avoid NoSuchElementException when there is no active session.
| private val minPartitions = if (SparkSession.getActiveSession.get.sparkContext.isLocal) { | |
| 0 // will splitted based on block by default. | |
| } else { | |
| math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1), | |
| SparkSession.getActiveSession.get.sparkContext.defaultMinPartitions) | |
| } | |
| private val ignoreEmptySplits = | |
| SparkSession.getActiveSession.get.sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS) | |
| private val minPartitions = if (sparkContext.isLocal) { | |
| 0 // will splitted based on block by default. | |
| } else { | |
| math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1), | |
| sparkContext.defaultMinPartitions) | |
| } | |
| private val ignoreEmptySplits = | |
| sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we will do a full table scan here? If that's a case, do you mind creating an issue and linking it here?
Copilot
AI
Feb 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fileFormat match only has cases for "orc" and "parquet". Since HiveTableUtil.getFileFormat can return "other", this can throw a MatchError at runtime. Add a default case that either throws a clear unsupported-format error or triggers a non-native fallback.
Copilot
AI
Feb 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partition values are populated into GenericInternalRow using raw strings from Hive metastore (partition.getTPartition.getValues). Spark scan planning usually casts these strings to the partition schema types (and handles DEFAULT_PARTITION_NAME -> null / time zones). Without that casting, partition values/types may be incorrect. Consider building the partition InternalRow via the same cast/toRow approach used in NativePaimonTableScanExec (or existing Spark/Hive utilities).
Copilot
AI
Feb 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In getFilePartitions, arrayPartitionedFile is an ArrayBuffer[PartitionedFile], but the code uses += getArrayPartitionedFile(...) where getArrayPartitionedFile returns an ArrayBuffer[PartitionedFile]. This is a type mismatch and won’t compile; use ++= (or change the helper to return a single PartitionedFile).
Copilot
AI
Feb 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FilePartition.getFilePartitions(SparkSession.getActiveSession.get, ...) again relies on getActiveSession.get. Use the sparkSession derived from basedHiveScan (as in NativeHiveTableScanBase) so partition planning doesn’t fail when there’s no active session.
| arrayFilePartition += FilePartition.getFilePartitions(SparkSession.getActiveSession.get, | |
| partitionedFiles, | |
| getMaxSplitBytes(SparkSession.getActiveSession.get)).toArray | |
| val sparkSession = basedHiveScan.sparkSession | |
| arrayFilePartition += FilePartition.getFilePartitions( | |
| sparkSession, | |
| partitionedFiles, | |
| getMaxSplitBytes(sparkSession) | |
| ).toArray |
Copilot
AI
Feb 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getMaxSplitBytes currently returns min(filesMaxPartitionBytes, filesOpenCostInBytes), which can drastically shrink splits and create excessive partitions. Elsewhere in this repo (NativePaimonTableScanExec) you fork Spark’s FilePartition#maxSplitBytes logic using min(defaultMaxSplitBytes, max(openCostInBytes, bytesPerCore)). Align this implementation to that logic (or call the shared helper) to avoid performance regressions.
| getMaxSplitBytes(SparkSession.getActiveSession.get)).toArray | |
| arrayFilePartition.toArray | |
| } | |
| private def getMaxSplitBytes(sparkSession: SparkSession): Long = { | |
| val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes | |
| val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes | |
| Math.min(defaultMaxSplitBytes, openCostInBytes) | |
| getMaxSplitBytes(SparkSession.getActiveSession.get, partitionedFiles)).toArray | |
| arrayFilePartition.toArray | |
| } | |
| private def getMaxSplitBytes( | |
| sparkSession: SparkSession, | |
| partitionedFiles: Seq[PartitionedFile]): Long = { | |
| val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes | |
| val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes | |
| val totalBytes = partitionedFiles.map(_.length).sum | |
| val parallelism = math.max(1, sparkSession.sparkContext.defaultParallelism) | |
| val bytesPerCore = if (totalBytes <= 0L) openCostInBytes else totalBytes / parallelism | |
| Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) |
Copilot
AI
Feb 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getArrayPartitionedFile declares inputFormatClass as a Hadoop mapreduce.InputFormat (newInputClass), but the implementation instantiates/casts it to the old mapred.InputFormat and uses JobConf/getSplits(JobConf, Int). This API mismatch is unsafe (compile-time and runtime). Use a consistent InputFormat API (likely org.apache.hadoop.mapred.InputFormat given JobConf/FileSplit usage).
Copilot
AI
Feb 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inputFormatClass match { case OrcInputFormat => ... } is matching a Class[_] value against a class name, and the cases are empty. If you need special handling by input format, compare against classOf[OrcInputFormat] / classOf[MapredParquetInputFormat] and implement the intended behavior; otherwise remove this dead code block.
| inputFormatClass match { | |
| case OrcInputFormat => | |
| case MapredParquetInputFormat => | |
| case _ => | |
| } |
Copilot
AI
Feb 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inputSplit match { case FileSplit => ... } won’t work: FileSplit is a class, not a matchable singleton. Use a typed pattern like case fs: FileSplit => and avoid re-casting the same value immediately after matching.
| case FileSplit => | |
| val orcInputSplit = inputSplit.asInstanceOf[FileSplit] | |
| case orcInputSplit: FileSplit => |
Copilot
AI
Feb 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getInputFormat takes a mapreduce.InputFormat class (newInputClass) but returns/instantiates org.apache.hadoop.mapred.InputFormat. This signature mismatch makes the unchecked cast even riskier. Align the parameter type with the returned InputFormat type (or vice versa) so the compiler can help enforce correctness.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark-hive_${scalaVersion}is declared twice: once with default scope (compile) and again with<scope>provided</scope>. This can unintentionally bundle Spark Hive classes into this module’s artifact and/or cause dependency resolution conflicts. Keep a single dependency entry with the intended scope.