2015-09-02 14:46:27,681-[TS] DEBUG Executor task launch worker-0 org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection - code for input[0, StringType],input[1, StringType],input[2, StringType],input[3, StringType],input[4, StringType]:
日志中的如下信息是如何产生的,这是列及其类型么?
input[0, StringType],input[1, StringType],input[2, StringType],input[3, StringType],input[4, StringType]
代码:
/**
* A projection that returns UnsafeRow.
*/
abstract class UnsafeProjection extends Projection {
//将IntervalRow转换为UnsafeRow
override def apply(row: InternalRow): UnsafeRow
}
2. UnsafeRow的pointTo方法
/**
* Update this UnsafeRow to point to different backing data.
*
* @param baseObject the base object
* @param baseOffset the offset within the base object
* @param numFields the number of fields in this row
* @param sizeInBytes the size of this row's backing data, in bytes
*/
public void pointTo(Object baseObject, long baseOffset, int numFields, int sizeInBytes) {
assert numFields >= 0 : "numFields (" + numFields + ") should >= 0";
this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields);
this.baseObject = baseObject;
this.baseOffset = baseOffset;
this.numFields = numFields;
this.sizeInBytes = sizeInBytes;
}
package org.apache.spark.sql.test;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.types.UTF8String;
///在Spark的代码中中没有UnsafeProjection的子类,UnsafeProjection的子类是动态生成的
//UnsafeProjection是一个抽象类,子类需要实现apply方法
class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
///涉及的expressions
private org.apache.spark.sql.catalyst.expressions.Expression[] expressions;
//apply方法返回的结果
private UnsafeRow convertedStruct10;
private byte[] buffer11;
private int cursor12;
public SpecificUnsafeProjection(org.apache.spark.sql.catalyst.expressions.Expression[] expressions) {
this.expressions = expressions;
this.convertedStruct10 = new UnsafeRow();
//buffer11是字节数组,长度为48
this.buffer11 = new byte[48];
this.cursor12 = 0;
}
// Scala.Function1 need this
// public Object apply(Object row) {
// return apply((InternalRow) row);
// }
public UnsafeRow apply(InternalRow i) {
//cursor12首先复制为48,这个更buffer11的长度一样
cursor12 = 48;
//这步操作是对convertedStruct10(UnsafeRow)的一些属性更新更新
//第二个参数是baseOffset,值是
//第三个参数(值5)是Row中的列数
//第四个参数cursor12表示这个Row的sizeInBytes(the size of this row's backing data, in bytes)
convertedStruct10.pointTo(buffer11, Platform.BYTE_ARRAY_OFFSET, 5, cursor12);
/* input[0, StringType] */
//第一列是否为null
boolean isNull0 = i.isNullAt(0);
//不为null,则通过调用getUTF8String获取其UTF8String数据
UTF8String primitive1 = isNull0 ? null : (i.getUTF8String(0));
//获取第一列的字节数(如果是null,则为0),加到cursor12上。
int numBytes14 = cursor12 + (isNull0 ? 0 : org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.UTF8StringWriter.getSize(primitive1));
//如果第一列不为null
if (buffer11.length < numBytes14) {
// This will not happen frequently, because the buffer is re-used.
//扩容
byte[] tmpBuffer13 = new byte[numBytes14 * 2];
//将buffer11的数据复制到tmpBuffer13,然后将tmpBuffer13复制给buffer11,此时buffer11完成了扩容的工作
Platform.copyMemory(buffer11, Platform.BYTE_ARRAY_OFFSET,
tmpBuffer13, Platform.BYTE_ARRAY_OFFSET, buffer11.length);
buffer11 = tmpBuffer13;
}
//更新值
convertedStruct10.pointTo(buffer11, Platform.BYTE_ARRAY_OFFSET, 5, numBytes14);
if (isNull0) {
convertedStruct10.setNullAt(0);
} else {
cursor12 += org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.UTF8StringWriter.write(convertedStruct10, 0, cursor12, primitive1);
}
/* input[1, StringType] */
boolean isNull2 = i.isNullAt(1);
UTF8String primitive3 = isNull2 ? null : (i.getUTF8String(1));
int numBytes15 = cursor12 + (isNull2 ? 0 : org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.UTF8StringWriter.getSize(primitive3));
if (buffer11.length < numBytes15) {
// This will not happen frequently, because the buffer is re-used.
byte[] tmpBuffer13 = new byte[numBytes15 * 2];
Platform.copyMemory(buffer11, Platform.BYTE_ARRAY_OFFSET,
tmpBuffer13, Platform.BYTE_ARRAY_OFFSET, buffer11.length);
buffer11 = tmpBuffer13;
}
convertedStruct10.pointTo(buffer11, Platform.BYTE_ARRAY_OFFSET, 5, numBytes15);
if (isNull2) {
convertedStruct10.setNullAt(1);
} else {
cursor12 += org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.UTF8StringWriter.write(convertedStruct10, 1, cursor12, primitive3);
}
/* input[2, StringType] */
boolean isNull4 = i.isNullAt(2);
UTF8String primitive5 = isNull4 ? null : (i.getUTF8String(2));
int numBytes16 = cursor12 + (isNull4 ? 0 : org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.UTF8StringWriter.getSize(primitive5));
if (buffer11.length < numBytes16) {
// This will not happen frequently, because the buffer is re-used.
byte[] tmpBuffer13 = new byte[numBytes16 * 2];
Platform.copyMemory(buffer11, Platform.BYTE_ARRAY_OFFSET,
tmpBuffer13, Platform.BYTE_ARRAY_OFFSET, buffer11.length);
buffer11 = tmpBuffer13;
}
convertedStruct10.pointTo(buffer11, Platform.BYTE_ARRAY_OFFSET, 5, numBytes16);
if (isNull4) {
convertedStruct10.setNullAt(2);
} else {
cursor12 += org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.UTF8StringWriter.write(convertedStruct10, 2, cursor12, primitive5);
}
/* input[3, StringType] */
boolean isNull6 = i.isNullAt(3);
UTF8String primitive7 = isNull6 ? null : (i.getUTF8String(3));
int numBytes17 = cursor12 + (isNull6 ? 0 : org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.UTF8StringWriter.getSize(primitive7));
if (buffer11.length < numBytes17) {
// This will not happen frequently, because the buffer is re-used.
byte[] tmpBuffer13 = new byte[numBytes17 * 2];
Platform.copyMemory(buffer11, Platform.BYTE_ARRAY_OFFSET,
tmpBuffer13, Platform.BYTE_ARRAY_OFFSET, buffer11.length);
buffer11 = tmpBuffer13;
}
convertedStruct10.pointTo(buffer11, Platform.BYTE_ARRAY_OFFSET, 5, numBytes17);
if (isNull6) {
convertedStruct10.setNullAt(3);
} else {
cursor12 += org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.UTF8StringWriter.write(convertedStruct10, 3, cursor12, primitive7);
}
/* input[4, StringType] */
boolean isNull8 = i.isNullAt(4);
UTF8String primitive9 = isNull8 ? null : (i.getUTF8String(4));
int numBytes18 = cursor12 + (isNull8 ? 0 : org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.UTF8StringWriter.getSize(primitive9));
if (buffer11.length < numBytes18) {
// This will not happen frequently, because the buffer is re-used.
byte[] tmpBuffer13 = new byte[numBytes18 * 2];
Platform.copyMemory(buffer11, Platform.BYTE_ARRAY_OFFSET,
tmpBuffer13, Platform.BYTE_ARRAY_OFFSET, buffer11.length);
buffer11 = tmpBuffer13;
}
convertedStruct10.pointTo(buffer11, Platform.BYTE_ARRAY_OFFSET, 5, numBytes18);
if (isNull8) {
convertedStruct10.setNullAt(4);
} else {
cursor12 += org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.UTF8StringWriter.write(convertedStruct10, 4, cursor12, primitive9);
}
return convertedStruct10;
}
}
分享到:
相关推荐
剥离的Parser模块,用于查看Spark SQL语法解析SQL后生成的语法树
Spark Core + Spark SQL + MongoDB 离线推荐 静态数据处理:Spark Core + Spark SQL 推荐服务:Spark Core + Spark MLlib 在线推荐 获取消息服务:Redis + Kafka 推荐服务:Spark Streaming 数据集格式 商品数据集...
* 通过whole-stage code generation(全流程代码生成)技术将spark sql和dataset的性能提升2~10倍 * 通过vectorization(向量化)技术提升parquet文件的扫描吞吐量 * 提升orc文件的读写性能 * 提升catalyst查询优化...
大量图示,生动形象,总共7个pdf,看完对spark的原理,运行机制以及后续性能调优有很大的帮助,这是第五个pdf,描述了spark任务部署图,Job的提交,逻辑执行图、物理执行图的生成,task的分配,Task的运行等
例子中定义了多个List数据集合,包括用户信息,订单信息,用户订单信息,将List对象生成DataFrame,使用SparkSQL查询将多个DataFrame合成一个DataFrame,使用Scala语言编写。
在Spark SQL执行etl时候会有最终结果大小只有几百k,但是小文件一个分区有上千的情况。危害: ... coalesce:coalesce()方法的作用是返回指定一个新的指定分区的Rdd,如果是生成一个窄依赖的结果
SparkSQLOnHBase利用spark sql在HBase上搭建的sql查询, 支持标准sql查询操作,后续有空闲时间会增加支持插入,删除,建表相关的ddl 语法(rowkey生成策略 部分尚未找到较好的解决方案,hbase查询 table也有待修改...
一:为什么sparkSQL? 3 1.1:sparkSQL的发展历程 3 1.1.1:hive and shark 3 1.1.2:Shark和sparkSQL 4 1.2:sparkSQL的性能 5 1.2.1:内存列存储(In-Memory Columnar Storage) 6 1.2.2:字节码生成技术...
spark-jobserver提供了一个RESTful接口,用于提交和管理作业,jar和作业上下文。 此存储库包含完整的Spark作业服务器项目,包括单元测试和部署脚本。 它最初始于 ,但现在是主要的开发仓库。 其他有用的链接:,, ...
Yardstick Apache Spark 是一组基于 Yardstick 框架编写的基准测试。 标尺框架 有关如何运行 Yardstick 基准测试和如何生成图表的详细信息,请访问。 下面的文档描述了除标准 Yardstick 参数之外的配置参数。 安装 ...
支持SQL: MySQL Flink SQL Spark SQL Hive SQL PL / SQL 提示:该项目是Javascript语言的默认项目,如果需要,您也可以尝试将其编译为其他语言。安装// use npmnpm i dt-sql-parser --save// use yarnyarn add dt-...
一个sql过来解析成unresolved,只拿出来字段名和表名但是不知道字段名和表名在哪个位置需要通过Schema确定表的位置等信息,生成逻辑执行计划,Logical,知道数据从哪里来了通过一些列优化过滤生成物理执行计划...
目标简便高效与使用类来管理Web和数据库请求的大型框架(如Laravel和Symphony)不同,Spark只是一小部分可重用的过程函数集合,这些过程函数包装和扩展了PHP标准库以提高其易用性。可读性和简洁性Spark是用语言实现...
比如我们使用 Spark SQL 去执行一些 SQL,这个 SQL 在最后生成了大量的文件。然后我们可以看到,这个 SQL 所有的 Spark Jobs 其实已经运行完成了,但是这个查询语句还在运行。通过日志,我们可以看到 driver 节点...
hive-vs-sparkSQL-perf-test:这是python源代码,用于从性能测试中生成结果的图形视图,以比较Hive和Saprk SQL。 此源代码是博客的一个伴侣
它提供了Java和Python API,可将FHIR资源转换为Spark数据集,然后可以利用该平台的全部功能(包括Spark SQL)对其进行探索。 有关详细信息,请参阅。建造本生使用构建和测试,并具有标准Maven生命周期来构建,安装...
sope-spark :该模块包含有用的Dataframe函数和一个Scala内部dsl库,该库有助于以简洁的方式编写Spark SQL转换。 sope-etl :此模块提供基于YAML的外部转换器,具有易于使用的ETL构造,其行为类似于配置/脚本驱动的...
解析层:用于解析,验证,优化SQL语句,拆分混合SQL并最终生成查询计划; 计算层:用于将查询计划路由到特定执行计划,然后解释为给定存储或引擎的可执行代码(例如Elasticsearch JSON查询或Hive HQL); 存储层...
想对比一下MySQL、GreenPlum、Elasticsearch、Hive、Spark SQL、Presto、Impala、Drill、HAWQ、Druid、Pinot、Kylin、ClickHouse、Kudu等不同实现方案之间的表现,那你就需要一份标准的数据进行测试,这个开源项目...
如下所示: ...补充知识:Sql语句实现不同记录同一属性列的差值计算 所使用的表的具体结构如下图所示 Table中主键是(plateNumber+currentTime) 要实现的查询是: 给定车牌号和查询的时间区间,查询给