From 7a54f839946445996fe2ed8e741ff006040c9d36 Mon Sep 17 00:00:00 2001 From: guihuawen Date: Mon, 9 Feb 2026 22:28:54 +0800 Subject: [PATCH 1/2] [ARUON #1994] Support HiveTableScanExec to native --- spark-extension-shims-spark/pom.xml | 9 + .../auron/plan/NativeHIveTableScanExec.scala | 263 ++++++++++++++++++ 2 files changed, 272 insertions(+) create mode 100644 spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeHIveTableScanExec.scala diff --git a/spark-extension-shims-spark/pom.xml b/spark-extension-shims-spark/pom.xml index 4c75845bf..9fc1280c3 100644 --- a/spark-extension-shims-spark/pom.xml +++ b/spark-extension-shims-spark/pom.xml @@ -36,6 +36,15 @@ auron-spark-ui_${scalaVersion} ${project.version} + + org.apache.spark + spark-hive_${scalaVersion} + + + org.apache.spark + spark-catalyst_${scalaVersion} + provided + org.apache.auron auron-common_${scalaVersion} diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeHIveTableScanExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeHIveTableScanExec.scala new file mode 100644 index 000000000..5fc475911 --- /dev/null +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeHIveTableScanExec.scala @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.execution.auron.plan + +import scala.collection.JavaConverters._ +import org.apache.auron.metric.SparkMetricNode +import org.apache.auron.{protobuf => pb} +import org.apache.hadoop.conf.Configurable +import org.apache.hadoop.hive.ql.exec.Utilities +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat +import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable} +import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.hadoop.hive.serde.serdeConstants +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption +import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, StructObjectInspector} +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils +import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapred.{FileSplit, InputFormat, JobConf} +import org.apache.hadoop.mapreduce.{InputFormat => newInputClass} +import org.apache.hadoop.util.ReflectionUtils +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.auron.NativeRDD +import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} +import org.apache.spark.sql.hive.client.HiveClientImpl +import org.apache.spark.sql.hive.execution.HiveTableScanExec +import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.internal.config.HADOOP_RDD_IGNORE_EMPTY_SPLITS +import org.apache.spark.paths.SparkPath +import org.apache.spark.sql.catalyst.expressions.{AttributeMap, GenericInternalRow, SpecificInternalRow} +import org.apache.spark.sql.hive.{HadoopTableReader, HiveShim} + +import java.util.UUID +import scala.collection.mutable.ArrayBuffer + +case class NativeHiveTableScanExec(basedHiveScan: HiveTableScanExec) + extends NativeHiveTableScanBase(basedHiveScan) + with Logging { + + @transient private lazy val nativeTable: HiveTable = HiveClientImpl.toHiveTable(relation.tableMeta) + @transient private lazy val fileFormat = HiveTableUtil.getFileFormat(nativeTable.getInputFormatClass) + @transient private lazy val nativeTableDesc = new TableDesc( + nativeTable.getInputFormatClass, + nativeTable.getOutputFormatClass, + nativeTable.getMetadata) + + @transient private lazy val nativeHadoopConf = { + val hiveConf = SparkSession.getActiveSession.get.sessionState.newHadoopConf() + // append columns ids and names before broadcast + val columnOrdinals = AttributeMap(relation.dataCols.zipWithIndex) + val neededColumnIDs = output.flatMap(columnOrdinals.get).map(o => o: Integer) + val neededColumnNames = output.filter(columnOrdinals.contains).map(_.name) + + HiveShim.appendReadColumns(hiveConf, neededColumnIDs, neededColumnNames) + + val deserializer = nativeTableDesc.getDeserializerClass.getConstructor().newInstance() + deserializer.initialize(hiveConf, nativeTableDesc.getProperties) + + // Specifies types and object inspectors of columns to be scanned. + val structOI = ObjectInspectorUtils + .getStandardObjectInspector( + deserializer.getObjectInspector, + ObjectInspectorCopyOption.JAVA) + .asInstanceOf[StructObjectInspector] + + val columnTypeNames = structOI + .getAllStructFieldRefs.asScala + .map(_.getFieldObjectInspector) + .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName) + .mkString(",") + + hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames) + hiveConf.set(serdeConstants.LIST_COLUMNS, relation.dataCols.map(_.name).mkString(",")) + hiveConf + } + + private val minPartitions = if (SparkSession.getActiveSession.get.sparkContext.isLocal) { + 0 // will splitted based on block by default. + } else { + math.max(nativeHadoopConf.getInt("mapreduce.job.maps", 1), + SparkSession.getActiveSession.get.sparkContext.defaultMinPartitions) + } + + private val ignoreEmptySplits = + SparkSession.getActiveSession.get.sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS) + + override val nodeName: String = + s"NativeHiveTableScan $tableName" + + override def doExecuteNative(): NativeRDD = { + val nativeMetrics = SparkMetricNode( + metrics, + Nil, + Some({ + case ("bytes_scanned", v) => + val inputMetric = TaskContext.get.taskMetrics().inputMetrics + inputMetric.incBytesRead(v) + case ("output_rows", v) => + val inputMetric = TaskContext.get.taskMetrics().inputMetrics + inputMetric.incRecordsRead(v) + case _ => + })) + val nativeFileSchema = this.nativeFileSchema + val nativeFileGroups = this.nativeFileGroups + val nativePartitionSchema = this.nativePartitionSchema + + val projection = schema.map(field => relation.schema.fieldIndex(field.name)) + val broadcastedHadoopConf = this.broadcastedHadoopConf + val numPartitions = partitions.length + + new NativeRDD( + sparkContext, + nativeMetrics, + partitions.asInstanceOf[Array[Partition]], + None, + Nil, + rddShuffleReadFull = true, + (partition, _) => { + val resourceId = s"NativeHiveTableScan:${UUID.randomUUID().toString}" + putJniBridgeResource(resourceId, broadcastedHadoopConf) + + val nativeFileGroup = nativeFileGroups(partition.asInstanceOf[FilePartition]) + val nativeFileScanConf = pb.FileScanExecConf + .newBuilder() + .setNumPartitions(numPartitions) + .setPartitionIndex(partition.index) + .setStatistics(pb.Statistics.getDefaultInstance) + .setSchema(nativeFileSchema) + .setFileGroup(nativeFileGroup) + .addAllProjection(projection.map(Integer.valueOf).asJava) + .setPartitionSchema(nativePartitionSchema) + .build() + fileFormat match { + case "orc" => + val nativeOrcScanExecBuilder = pb.OrcScanExecNode + .newBuilder() + .setBaseConf(nativeFileScanConf) + .setFsResourceId(resourceId) + .addAllPruningPredicates(new java.util.ArrayList()) // not support this filter + pb.PhysicalPlanNode + .newBuilder() + .setOrcScan(nativeOrcScanExecBuilder.build()) + .build() + case "parquet" => + val nativeParquetScanExecBuilder = pb.ParquetScanExecNode + .newBuilder() + .setBaseConf(nativeFileScanConf) + .setFsResourceId(resourceId) + .addAllPruningPredicates(new java.util.ArrayList()) // not support this filter + + pb.PhysicalPlanNode + .newBuilder() + .setParquetScan(nativeParquetScanExecBuilder.build()) + .build() + } + }, + friendlyName = "NativeRDD.HiveScan") + } + + override def getFilePartitions(): Array[FilePartition] = { + val newJobConf = new JobConf(nativeHadoopConf) + var indexPartition = 0 + val arrayFilePartition = ArrayBuffer[FilePartition]() + if (relation.isPartitioned) { + val partitions = basedHiveScan.prunedPartitions + partitions.foreach { partition => + val partDesc = Utilities.getPartitionDescFromTableDesc(nativeTableDesc, partition, true) + val partPath = partition.getDataLocation + HadoopTableReader.initializeLocalJobConfFunc(partPath.toString, nativeTableDesc)(newJobConf) + val partitionValues = partition.getTPartition.getValues + + val partitionInternalRow = new GenericInternalRow(partitionValues.size()) + for (partitionIndex <- 0 until partitionValues.size) { + partitionInternalRow.update(partitionIndex, partitionValues.get(partitionIndex)) + } + + val inputFormatClass = partDesc.getInputFileFormatClass + .asInstanceOf[Class[newInputClass[Writable, Writable]]] + val (midFilePartition, midIndexPartition) = + getArrayFilePartition(newJobConf, inputFormatClass, indexPartition, partitionInternalRow) + arrayFilePartition += midFilePartition + indexPartition = midIndexPartition + } + } else { + val inputFormatClass = nativeTable.getInputFormatClass.asInstanceOf[Class[newInputClass[Writable, Writable]]] + val (midFilePartition, midIndexPartition) = + getArrayFilePartition(newJobConf, inputFormatClass, indexPartition, new GenericInternalRow(0)) + arrayFilePartition += midFilePartition + indexPartition = midIndexPartition + } + arrayFilePartition.toArray + } + + private def getArrayFilePartition(newJobConf: JobConf, + inputFormatClass: Class[newInputClass[Writable, Writable]], + indexPartition: Int, + partitionInternalRow: GenericInternalRow): (ArrayBuffer[FilePartition], Int) = { + val allInputSplits = getInputFormat(newJobConf, inputFormatClass).getSplits(newJobConf, minPartitions) + val inputSplits = if (ignoreEmptySplits) { + allInputSplits.filter(_.getLength > 0) + } else { + allInputSplits + } + inputFormatClass match { + case OrcInputFormat => + case MapredParquetInputFormat => + case _ => + } + val arrayFilePartition = ArrayBuffer[FilePartition]() + var initIndexPartition = indexPartition + for (i <- 0 until inputSplits.size) { + val inputSplit = inputSplits(i) + inputSplit match { + case FileSplit => + val orcInputSplit = inputSplit.asInstanceOf[FileSplit] + arrayFilePartition += FilePartition(indexPartition, (PartitionedFile(partitionInternalRow, + SparkPath.fromPathString(orcInputSplit.getPath.toString), orcInputSplit.getStart, orcInputSplit.getLength) :: Nil).toArray) + initIndexPartition = initIndexPartition + 1 + } + } + (arrayFilePartition, initIndexPartition) + } + + private def getInputFormat(conf: JobConf, inputFormatClass: Class[newInputClass[Writable, Writable]]): + InputFormat[Writable, Writable] = { + val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf) + .asInstanceOf[InputFormat[Writable, Writable]] + newInputFormat match { + case c: Configurable => c.setConf(conf) + case _ => + } + newInputFormat + } + +} + +object HiveTableUtil { + def getFileFormat(inputFormatClass: Class[_ <: InputFormat[_, _]]): String = { + if (inputFormatClass.getSimpleName.equalsIgnoreCase("OrcInputFormat")) { + "orc" + } else if (inputFormatClass.getSimpleName.equalsIgnoreCase("MapredParquetInputFormat")) { + "parquet" + } else { + "other" + } + } + +} \ No newline at end of file From b53d84b69ef4370cf8229126ccdcfeebaf11ec61 Mon Sep 17 00:00:00 2001 From: guihuawen Date: Tue, 10 Feb 2026 11:44:48 +0800 Subject: [PATCH 2/2] [ARUON #1994] Support HiveTableScanExec to native --- .../auron/plan/HiveConvertProvider.scala | 53 +++++++++++++ .../auron/plan/NativeHIveTableScanExec.scala | 66 +++++++++------- .../spark/sql/auron/AuronConverters.scala | 76 ++++++------------- 3 files changed, 114 insertions(+), 81 deletions(-) create mode 100644 spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/HiveConvertProvider.scala rename spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/{ => hive}/execution/auron/plan/NativeHIveTableScanExec.scala (84%) diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/HiveConvertProvider.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/HiveConvertProvider.scala new file mode 100644 index 000000000..46588721f --- /dev/null +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/HiveConvertProvider.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution.auron.plan + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.auron.AuronConverters.getBooleanConf +import org.apache.spark.sql.auron.{AuronConvertProvider, AuronConverters} +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.hive.execution.HiveTableScanExec + +class HiveConvertProvider extends AuronConvertProvider with Logging { + override def isEnabled: Boolean = + getBooleanConf("spark.auron.enable.hiveTable", defaultValue = true) + + def enableHiveTableScanExec: Boolean = + getBooleanConf("spark.auron.enable.hiveTableScanExec", defaultValue = false) + + override def isSupported(exec: SparkPlan): Boolean = + exec match { + case e: HiveTableScanExec if enableHiveTableScanExec && + e.relation.tableMeta.provider.isDefined && + e.relation.tableMeta.provider.get.equals("hive") => + true + case _ => false + } + + override def convert(exec: SparkPlan): SparkPlan = { + exec match { + case hiveExec: HiveTableScanExec if enableHiveTableScanExec => + convertHiveTableScanExec(hiveExec) + case _ => exec + } + } + + def convertHiveTableScanExec(hiveExec: HiveTableScanExec): SparkPlan = { + AuronConverters.addRenameColumnsExec(NativeHiveTableScanExec(hiveExec)) + } +} diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeHIveTableScanExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHIveTableScanExec.scala similarity index 84% rename from spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeHIveTableScanExec.scala rename to spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHIveTableScanExec.scala index 5fc475911..f3e1d2534 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeHIveTableScanExec.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHIveTableScanExec.scala @@ -16,14 +16,13 @@ */ package org.apache.spark.sql.hive.execution.auron.plan -import scala.collection.JavaConverters._ import org.apache.auron.metric.SparkMetricNode import org.apache.auron.{protobuf => pb} import org.apache.hadoop.conf.Configurable import org.apache.hadoop.hive.ql.exec.Utilities import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat -import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable} +import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable} import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption @@ -34,18 +33,18 @@ import org.apache.hadoop.mapred.{FileSplit, InputFormat, JobConf} import org.apache.hadoop.mapreduce.{InputFormat => newInputClass} import org.apache.hadoop.util.ReflectionUtils import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.HADOOP_RDD_IGNORE_EMPTY_SPLITS import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.auron.NativeRDD +import org.apache.spark.sql.auron.{NativeRDD, Shims} +import org.apache.spark.sql.catalyst.expressions.{AttributeMap, GenericInternalRow} import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} import org.apache.spark.sql.hive.client.HiveClientImpl import org.apache.spark.sql.hive.execution.HiveTableScanExec -import org.apache.spark.{Partition, TaskContext} -import org.apache.spark.internal.config.HADOOP_RDD_IGNORE_EMPTY_SPLITS -import org.apache.spark.paths.SparkPath -import org.apache.spark.sql.catalyst.expressions.{AttributeMap, GenericInternalRow, SpecificInternalRow} import org.apache.spark.sql.hive.{HadoopTableReader, HiveShim} +import org.apache.spark.{Partition, TaskContext} import java.util.UUID +import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer case class NativeHiveTableScanExec(basedHiveScan: HiveTableScanExec) @@ -169,15 +168,15 @@ case class NativeHiveTableScanExec(basedHiveScan: HiveTableScanExec) .build() } }, - friendlyName = "NativeRDD.HiveScan") + friendlyName = "NativeRDD.HiveTableScan") } override def getFilePartitions(): Array[FilePartition] = { val newJobConf = new JobConf(nativeHadoopConf) - var indexPartition = 0 val arrayFilePartition = ArrayBuffer[FilePartition]() - if (relation.isPartitioned) { + val partitionedFiles = if (relation.isPartitioned) { val partitions = basedHiveScan.prunedPartitions + val arrayPartitionedFile = ArrayBuffer[PartitionedFile]() partitions.foreach { partition => val partDesc = Utilities.getPartitionDescFromTableDesc(nativeTableDesc, partition, true) val partPath = partition.getDataLocation @@ -191,25 +190,32 @@ case class NativeHiveTableScanExec(basedHiveScan: HiveTableScanExec) val inputFormatClass = partDesc.getInputFileFormatClass .asInstanceOf[Class[newInputClass[Writable, Writable]]] - val (midFilePartition, midIndexPartition) = - getArrayFilePartition(newJobConf, inputFormatClass, indexPartition, partitionInternalRow) - arrayFilePartition += midFilePartition - indexPartition = midIndexPartition + arrayPartitionedFile += getArrayPartitionedFile(newJobConf, inputFormatClass, partitionInternalRow) } + arrayPartitionedFile + .sortBy(_.length)(implicitly[Ordering[Long]].reverse) + .toArray } else { val inputFormatClass = nativeTable.getInputFormatClass.asInstanceOf[Class[newInputClass[Writable, Writable]]] - val (midFilePartition, midIndexPartition) = - getArrayFilePartition(newJobConf, inputFormatClass, indexPartition, new GenericInternalRow(0)) - arrayFilePartition += midFilePartition - indexPartition = midIndexPartition + getArrayPartitionedFile(newJobConf, inputFormatClass, new GenericInternalRow(0)) + .sortBy(_.length)(implicitly[Ordering[Long]].reverse) + .toArray } + arrayFilePartition += FilePartition.getFilePartitions(SparkSession.getActiveSession.get, + partitionedFiles, + getMaxSplitBytes(SparkSession.getActiveSession.get)).toArray arrayFilePartition.toArray } - private def getArrayFilePartition(newJobConf: JobConf, + private def getMaxSplitBytes(sparkSession: SparkSession): Long = { + val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes + val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes + Math.min(defaultMaxSplitBytes, openCostInBytes) + } + + private def getArrayPartitionedFile(newJobConf: JobConf, inputFormatClass: Class[newInputClass[Writable, Writable]], - indexPartition: Int, - partitionInternalRow: GenericInternalRow): (ArrayBuffer[FilePartition], Int) = { + partitionInternalRow: GenericInternalRow): ArrayBuffer[PartitionedFile] = { val allInputSplits = getInputFormat(newJobConf, inputFormatClass).getSplits(newJobConf, minPartitions) val inputSplits = if (ignoreEmptySplits) { allInputSplits.filter(_.getLength > 0) @@ -221,19 +227,18 @@ case class NativeHiveTableScanExec(basedHiveScan: HiveTableScanExec) case MapredParquetInputFormat => case _ => } - val arrayFilePartition = ArrayBuffer[FilePartition]() - var initIndexPartition = indexPartition + val arrayFilePartition = ArrayBuffer[PartitionedFile]() for (i <- 0 until inputSplits.size) { val inputSplit = inputSplits(i) inputSplit match { case FileSplit => val orcInputSplit = inputSplit.asInstanceOf[FileSplit] - arrayFilePartition += FilePartition(indexPartition, (PartitionedFile(partitionInternalRow, - SparkPath.fromPathString(orcInputSplit.getPath.toString), orcInputSplit.getStart, orcInputSplit.getLength) :: Nil).toArray) - initIndexPartition = initIndexPartition + 1 + arrayFilePartition += + Shims.get.getPartitionedFile(partitionInternalRow, orcInputSplit.getPath.toString, + orcInputSplit.getStart, orcInputSplit.getLength) } } - (arrayFilePartition, initIndexPartition) + arrayFilePartition } private def getInputFormat(conf: JobConf, inputFormatClass: Class[newInputClass[Writable, Writable]]): @@ -250,10 +255,13 @@ case class NativeHiveTableScanExec(basedHiveScan: HiveTableScanExec) } object HiveTableUtil { + private val orcFormat = "OrcInputFormat" + private val parquetFormat = "MapredParquetInputFormat" + def getFileFormat(inputFormatClass: Class[_ <: InputFormat[_, _]]): String = { - if (inputFormatClass.getSimpleName.equalsIgnoreCase("OrcInputFormat")) { + if (inputFormatClass.getSimpleName.equalsIgnoreCase(orcFormat)) { "orc" - } else if (inputFormatClass.getSimpleName.equalsIgnoreCase("MapredParquetInputFormat")) { + } else if (inputFormatClass.getSimpleName.equalsIgnoreCase(parquetFormat)) { "parquet" } else { "other" diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala index 4f124bd8f..aad6422f2 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala @@ -16,55 +16,29 @@ */ package org.apache.spark.sql.auron -import java.util.ServiceLoader - -import scala.annotation.tailrec -import scala.collection.JavaConverters._ -import scala.collection.mutable - +import org.apache.auron.configuration.AuronConfiguration +import org.apache.auron.jni.AuronAdaptor +import org.apache.auron.metric.SparkMetricNode +import org.apache.auron.protobuf.{EmptyPartitionsExecNode, PhysicalPlanNode} +import org.apache.auron.spark.configuration.SparkAuronConfiguration +import org.apache.auron.sparkver import org.apache.commons.lang3.reflect.MethodUtils import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat import org.apache.spark.Partition import org.apache.spark.broadcast.Broadcast -import org.apache.spark.internal.{config, Logging} -import org.apache.spark.sql.auron.AuronConvertStrategy.{childOrderingRequiredTag, convertibleTag, convertStrategyTag, convertToNonNativeTag, isNeverConvert, joinSmallerSideTag, neverConvertReasonTag} -import org.apache.spark.sql.auron.NativeConverters.{existTimestampType, isTypeSupported, roundRobinTypeSupported, StubExpr} +import org.apache.spark.internal.{Logging, config} +import org.apache.spark.sql.auron.AuronConvertStrategy._ +import org.apache.spark.sql.auron.NativeConverters.{StubExpr, existTimestampType, isTypeSupported, roundRobinTypeSupported} import org.apache.spark.sql.auron.join.JoinBuildSides.{JoinBuildLeft, JoinBuildRight, JoinBuildSide} import org.apache.spark.sql.auron.util.AuronLogUtils.logDebugPlanConversion -import org.apache.spark.sql.catalyst.expressions.AggregateWindowFunction -import org.apache.spark.sql.catalyst.expressions.Alias -import org.apache.spark.sql.catalyst.expressions.Ascending -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.catalyst.expressions.NamedExpression -import org.apache.spark.sql.catalyst.expressions.SortOrder -import org.apache.spark.sql.catalyst.expressions.WindowExpression -import org.apache.spark.sql.catalyst.expressions.WindowSpecDefinition -import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression -import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction -import org.apache.spark.sql.catalyst.expressions.aggregate.Final -import org.apache.spark.sql.catalyst.expressions.aggregate.Partial -import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning -import org.apache.spark.sql.catalyst.plans.physical.Partitioning -import org.apache.spark.sql.catalyst.plans.physical.RangePartitioning -import org.apache.spark.sql.catalyst.plans.physical.RoundRobinPartitioning +import org.apache.spark.sql.catalyst.expressions.{AggregateWindowFunction, Alias, Ascending, Attribute, AttributeReference, Expression, Literal, NamedExpression, SortOrder, WindowExpression, WindowSpecDefinition} +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction, Final, Partial} +import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning} import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.aggregate.HashAggregateExec -import org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec -import org.apache.spark.sql.execution.aggregate.SortAggregateExec -import org.apache.spark.sql.execution.auron.plan.ConvertToNativeBase -import org.apache.spark.sql.execution.auron.plan.NativeAggBase -import org.apache.spark.sql.execution.auron.plan.NativeBroadcastExchangeBase -import org.apache.spark.sql.execution.auron.plan.NativeOrcScanBase -import org.apache.spark.sql.execution.auron.plan.NativeParquetScanBase -import org.apache.spark.sql.execution.auron.plan.NativeSortBase -import org.apache.spark.sql.execution.auron.plan.NativeUnionBase -import org.apache.spark.sql.execution.auron.plan.Util +import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} +import org.apache.spark.sql.execution.auron.plan._ import org.apache.spark.sql.execution.command.DataWritingCommandExec -import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec -import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec +import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.hive.execution.InsertIntoHiveTable @@ -72,13 +46,9 @@ import org.apache.spark.sql.hive.execution.auron.plan.NativeHiveTableScanBase import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.LongType -import org.apache.auron.configuration.AuronConfiguration -import org.apache.auron.jni.AuronAdaptor -import org.apache.auron.metric.SparkMetricNode -import org.apache.auron.protobuf.EmptyPartitionsExecNode -import org.apache.auron.protobuf.PhysicalPlanNode -import org.apache.auron.spark.configuration.SparkAuronConfiguration -import org.apache.auron.sparkver +import java.util.ServiceLoader +import scala.annotation.tailrec +import scala.jdk.CollectionConverters.iterableAsScalaIterableConverter object AuronConverters extends Logging { def enableScan: Boolean = @@ -231,7 +201,6 @@ object AuronConverters extends Logging { } } convertedAgg - case e: SortAggregateExec if enableAggr => // sort aggregate val convertedAgg = tryConvert(e, convertSortAggregateExec) if (!e.getTagValue(convertibleTag).contains(true)) { @@ -242,7 +211,6 @@ object AuronConverters extends Logging { } } convertedAgg - case e: ExpandExec if enableExpand => // expand tryConvert(e, convertExpandExec) case e: WindowExec if enableWindow => // window @@ -256,7 +224,6 @@ object AuronConverters extends Logging { tryConvert(e, convertLocalTableScanExec) case e: DataWritingCommandExec if enableDataWriting => // data writing tryConvert(e, convertDataWritingCommandExec) - case exec: ForceNativeExecutionWrapperBase => exec case exec => extConvertProviders.find(h => h.isEnabled && h.isSupported(exec)) match { @@ -793,8 +760,8 @@ object AuronConverters extends Logging { child = convertProjectExec(ProjectExec(projections, exec.child))) } catch { case _: NoSuchMethodError => - import scala.reflect.runtime.universe._ import scala.reflect.runtime.currentMirror + import scala.reflect.runtime.universe._ val mirror = currentMirror.reflect(exec) val copyMethod = typeOf[HashAggregateExec].decl(TermName("copy")).asMethod val params = copyMethod.paramLists.flatten @@ -1048,6 +1015,11 @@ object AuronConverters extends Logging { convertToNative(exec) } + def convertHiveTableScanExec(exec: HiveTableScanExec): SparkPlan = { + logDebugPlanConversion(exec) + + } + def convertDataWritingCommandExec(exec: DataWritingCommandExec): SparkPlan = { logDebugPlanConversion(exec) exec match {