网站首页 > 技术文章 正文
前序
Greenplum是目前比较优秀的mpp数据库,其官方推荐了几种将外部数据写入Greenplum方式,包含:通用的Jdbc,pgcopy和pgload以及Pivotal Greenplum-Spark Connector等。
- Jdbc:Jdbc方式,写大数据量会很慢。
- pgcopy:其中pgcopy是及其不推荐的一种,因为其写数据必须经过Greenplum的master,因此也只建议小数据量使用。
- pgload:适合写大数据量数据,能并行写入。但其缺点是需要安装客户端,包括gpfdist等依赖,安装起来很麻烦。需要了解可以参考pgload。
- Greenplum-Spark Connector:基于Spark并行处理,并行写入Greenplum,并提供了并行读取的接口。也是接下来该文重点介绍的部分。
1. Greenplum-Spark Connector是啥
Greenplum-Spark Connector(GSC)是Pivotal提供的一个高性能并行的读写Greenplum的工具。不需要去安装麻烦的Greenplum loader客户端,也不用去实现繁琐的copy代码。
2. Greenplum-Spark Connector读数据架构
一个Spark application,是由Driver和Executor节点构成。当Spark application使用Greenplum-Spark Connector加载Greenplum数据时,其Driver端会通过JDBC的方式请求Greenplum的master节点获取相关的元数据信息。Connector将会根据这些元数据信息去决定Spark的Executor去怎样去并行的读取该表的数据。
Greenplum数据库存储数据是按segment组织的,Greenplum-Spark Connector在加载Greenplum数据时,需要指定Greenplum表的一个字段作为Spark的partition字段,Connector会使用这个字段的值来计算,该Greenplum表的某个segment该被哪一个或多个Spark partition读取。
其读取过程如下:
- Spark Driver通过Jdbc的方式连接Greenplum master,并读取指定表的相关元数据信息。然后根据指定的分区字段以及分区个数去决定segment怎么分配。
- Spark Executor端会通过Jdbc的方式连接Greenplum master,创建Greenplum外部表。
- 然后Spark Executor通过Http方式连接Greenplum的数据节点,获取指定的segment的数据。该获取数据的操作在Spark Executor并行执行。
其示意流程图如下:
3. Greenplum-Spark Connector写数据流程
- GSC在Spark Executor端通过Jetty启动一个Http服务,将该服务封装为支持Greenplum的gpfdist协议。
- GSC在Spark Executor端通过Jdbc方式连接Greenplum master,创建Greenplum外部表,该外部表文件地址指向该Executor所启动的gpfdist协议地址。SQL示例如下:
CREATE READABLE EXTERNAL TABLE"public"."spark_9dc823a6fa48df60_3d9d854163f8f07a_1_42" (LIKE "public"."rank_a1")LOCATION ('gpfdist://10.0.8.145:44772/spark_9dc823a6fa48df60_3d9d854163f8f07a_1_42')FORMAT 'CSV'(DELIMITER AS '|' NULL AS '')ENCODING 'UTF-8'
- GSC在Spark Executor端通过Jdbc方式连接Greenplum master,然后执行insert语句至真实的表中,数据来源于这张外部表。SQL示例如下:
INSERT INTO "public"."rank_a1"SELECT *FROM "public"."spark_9dc823a6fa48df60_3d9d854163f8f07a_1_42"
至于这张外部表的数据,是否落地当前Executor服务器,不清楚。猜测不会落地,而是直接通过Http直接传递给了Greenplum对应的Segment。
- GSC监听onApplicationEnd事件,在Spark application结束后,删除创建的外部表。
4. Greenplum-Spark Connector使用
- 下载GSC Jar包。 下载地址:Pivotal Network。 可直接下载最新版本的GSC即1.6.2,支持Greenplum5.0之后的版本。greenplum-spark_-.jar,如:
greenplum-spark_2.11-1.6.2.jar
- maven中引入:
<dependency>
<groupId>io.pivotal.greenplum.spark</groupId>
<artifactId>greenplum-spark_2.11</artifactId>
<version>1.6.2</version>
</dependency>
- spark提交引入:
- spark-shell或spark-submit时候,通过--jars加入greenplum-spark_2.11-1.6.2.jar。
- 将greenplum-spark_2.11-1.6.2.jar与Spark application包打成 uber jar 提交。
5. Greenplum-Spark Connector参数
参数名:url
参数描述:Jdbc连接的url。
作用域:读,写
参数名:dbschema
参数描述:Greenplum数据库的schema,GSC创建的临时外部表也在该schema下,默认值为public。
作用域:读,写
参数名:dbtable
参数描述:Greenplum数据库的表名,GSC在读取时,会读取dbschema下的表。GSC在写数据时,如果该表不存在会自动创建。
作用域:读,写
参数名:driver
参数描述:Jdbc driver全类名,非必填,在GSC Jar包中已经包含了driver包。
作用域:读,写
参数名:user
参数描述:用户名
作用域:读,写
参数名:password
参数描述:密码
作用域:读,写
参数名:partitionColumn
参数描述:Greenplum数据表的字段,该字段将作为Spark分区的字段,支持integer, bigint, serial, bigserial4中类型,该字段名需小写。该字段为必填,且必须是Greenplum表建表时 DISTRIBUTED BY ()语句中的字段。
作用域:读
参数名:partitions
参数描述:Spark分区数,非必填,其默认值为Greenplum的primary segments数量。
作用域:读
参数名:truncate
参数描述:当在Spark中指定了输出模式为SaveMode.Overwrite时候,写的目标表存在的时候的策略,非必填。默认为false,即GSC将会先删除然后重新创建目标表,然后在写数据。当为true时,GSC将会先truncates目标表,然后在写入数据。
作用域:写
参数名:iteratorOptimization
参数描述:指定写数据时内存模式,非必填。默认指为true,GSC将会使用 Iterator 方式。当为false时,GSC将会在写数据时将数据存储在内存中。
作用域:写
参数名:server.port
参数描述:指定在Spark Worker端启动gpfdist服务的端口号,非必填。默认情况下会使用随机的端口号。
作用域:读,写
参数名:server.useHostname
参数描述:指定是否使用Spark Worker节点的host name为gpfdis服务的地址,非必填。默认为false。
作用域:读,写
参数名:pool.maxSize
参数描述:GSC连接Greenplum的连接池的最大连接数,默认为64。
作用域:读,写
参数名:pool.timeoutMs
参数描述:非活动连接被认为是空闲连接的时间,毫秒值。默认为10000(10秒)。
作用域:读,写
参数名:pool.minIdle
参数描述:GSC连接Greenplum的连接池的最小空闲连接数,默认为0。
作用域:读,写
6. 从Greenplum读取数据
- DataFrameReader.load()方式:
val gscReadOptionMap = Map( "url" -> "jdbc:postgresql://gpdb-master:5432/testdb", "user" -> "bill", "password" -> "changeme", "dbschema" -> "myschema", "dbtable" -> "table1", "partitionColumn" -> "id")val gpdf = spark.read.format("greenplum") .options(gscReadOptionMap) .load()
- spark.read.greenplum()方式:
val url = "jdbc:postgresql://gpmaster.domain:15432/tutorial"val tblname = "avgdelay"val jprops = new Properties()jprops.put("user", "user2")jprops.put("password", "changeme")jprops.put("partitionColumn", "airlineid")val gpdf = spark.read.greenplum(url, tblname, jprops)
然鹅,这种方式必然需要引入一个隐式转换,官网也没介绍。
7. 写数据至Greenplum
7.1. 写数据示例:
val gscWriteOptionMap = Map("url" -> "jdbc:postgresql://gpdb-master:5432/testdb", "user" -> "bill", "password" -> "changeme", "dbschema" -> "myschema", "dbtable" -> "table2",)dfToWrite.write.format("greenplum") .options(gscWriteOptionMap) .save()
在通过GSC写到Greenplum表时,如果表已经存在或表中已经存在数据,可通过DataFrameWriter.mode(SaveMode savemode)方式指定其输出模式。相关模式行为如下:
SaveMode:ErrorIfExists
行为:如果Greenplum数据表已经存在则GSC直接返回错误,该策略为默认策略。
SaveMode:Append
行为:直接将Spark中数据追加至表中。
SaveMode:Ignore
行为:如果Greenplum数据表已经存在,GSC将不会写数据至表中也不会去修改已经存在的数据。
SaveMode:Overwrite
行为:如果Greenplum数据表已经存在,则truncate参数将会生效。默认为false,即GSC将会先删除然后重新创建目标表,然后在写数据。当为true时,GSC将会先truncates目标表,然后在写入数据。
7.2. GSC自动建表:
- 创建的Greenplum表将不会有distribution列,如下为GSC生成的建表语句:
CREATE TABLE "public"."rank_a1" ("id" INTEGER NOT NULL, "rank" TEXT, "year" INTEGER NOT NULL, "gender" INTEGER NOT NULL, "count" INTEGER NOT NULL);
- 创建的Greenplum表的字段名将会使用Spark DataFrame中的字段名。
- 在GSC自动建表时,将会为字段名加上双引号,这将使Greenplum区分大小写。
- 当Spark DataFrame的字段不为nullable时,GSC自动建表的字段将是 NOT NULL。
- 将会对应的Spark DataFrame字段类型映射为Greenplum的字段类型。参考,字段类型映射表。
7.3. 提前手动建表:
- 将Spark DataFrame的字段名的数据写至Greenplum表的对应的字段中。值得注意的是,GSC在做映射的时候,是严格区分大小写的。
- 写至Greenplum的字段的数据类型,与对应的Spark DataFrame一致,具体参见字段类型映射。
- 如果Spark数据中某列包含空数据,需确保对应的Greenplum表的列没有被指定为NOT NULL。
- Greenplum表中建表时其字段顺序可以与Spark DataFrame中不一致。但Greenplum表中不能出现不存在在Spark DataFrame中的字段。如下例子:
// Greenplum 中的字段CREATE TABLE public.rank_a1 ( id int4 NOT NULL, "rank" text NULL, "year" int4 NOT NULL, gender int4 NOT NULL, count int4 NOT NULL)DISTRIBUTED BY (id);// Spark DataFrame中的字段var df = Seq((2, "a|b", 2, 2, 2),(3, "a|b", 3, 3, 3)).toDF("id", "rank", "year", "gender")// 在写数据至public.rank_a1表时,将会报错如下Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: The number of columns doesn't match.Old column names (5): _1, _2, _3, _4, _5New column names (4): id, rank, year, gender at scala.Predef$.require(Predef.scala:224) at org.apache.spark.sql.Dataset.toDF(Dataset.scala:435) at org.apache.spark.sql.DatasetHolder.toDF(DatasetHolder.scala:44) at com.lt.spark.greenplum.GreenplumWrite$.main(GreenplumWrite.scala:14) at com.lt.spark.greenplum.GreenplumWrite.main(GreenplumWrite.scala)
- 确保指定的用户对于该表有读写的权限,自动建表,需要有建表的权限。
8. Troubleshooting
8.1. 端口相关问题
原因解决办法
错误信息:java.lang.RuntimeException: is not a valid port number.
解决办法:通过server.port所指定的端口无效,比如1024以内,为系统使用端口指定端口在[1024-65535]之间
错误信息:java.lang.RuntimeException:Unable to start GpfdistService on any of ports=
解决办法:通过server.port指定的端口已经被占用从新指定一个未被占用的端口,或不指定该参数
8.2. Greenplum连接数问题
当连接Greenplum的连接数接近Greenplum数据库配置的最大连接数(max_connections)时。Spark application将会抛出 connection limit exceeded 错误。
排查过程:
- 查询Greenplum数据的最大连接数:
postgres=# show max_connections; max_connections----------------- 250(1 row)
- 查询当前连接Greenplum数据库的连接数:
postgres=# SELECT count(*) FROM pg_stat_activity;
- 查询指定的用户连接Greenplum数据的连接数:
postgres=# SELECT count(*) FROM pg_stat_activity WHERE datname='tutorial';postgres=# SELECT count(*) FROM pg_stat_activity WHERE usename='user1';
- 查询Greenplum数据库空闲和活动的连接数:
postgres=# SELECT count(*) FROM pg_stat_activity WHERE current_query='<IDLE>';postgres=# SELECT count(*) FROM pg_stat_activity WHERE current_query!='<IDLE>';
- 查询连接Greenplum数据库名,用户名,客户端地址,客户端ip,当前查询语句:
postgres=# SELECT datname, usename, client_addr, client_port, current_query FROM pg_stat_activity;
如果确认是Spark application使用连接数过多,则配置JDBC Connection Pooling相关参数,减少连接数。
8.3. Greenplum Database Data Length Errors
在使用Greenplum 4.x或5.x的时候,可能会报出“data line too long”错误。这是因为在Greenplum数据库中参数项“gpmaxcsvlinelength”默认值是1M。需要登陆Greenplum master修改这个参数值,示例如下,通过gpconfig修改该参数的值为5M:
gpadmin@gpmaster$ gpconfig -c gp_max_csv_line_length -v 5242880gpadmin@gpmaster$ gpstop -u
9. 参考
- Greenplum-Spark Connector官方文档
- Greenplum建表语句文档
- Greenplum参数配置官方文档
猜你喜欢
- 2024-11-26 使用flask+echarts+html+Ajax实现数据分析可视化看板
- 2024-11-26 Excel之基础 - 常用函数整理来了,现用现查
- 2024-11-26 简单通俗说PageRank
- 2024-11-26 SQLSERVER全文检索(FULL-TEXT)语法
- 2024-11-26 「NLP」文本关键词提取的两种方法-TFIDF和TextRank
- 2024-11-26 职场办公中每天都要使用的6个Excel函数公式
- 2024-11-26 威廉王子在“神秘机构”待了三周...出来后变成了这样?
- 2024-11-26 Day60:用Python解析XML文件(xml.etree.ElementTree)
- 2024-11-26 Python 与 Excel 不得不说的事:这几个常用库你知道几个?
- 2024-11-26 职场人必备的15个excel函数公式,简单易懂,快速提高工作效率
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- oraclesql优化 (66)
- 类的加载机制 (75)
- feignclient (62)
- 一致性hash算法 (71)
- dockfile (66)
- 锁机制 (57)
- javaresponse (60)
- 查看hive版本 (59)
- phpworkerman (57)
- spark算子 (58)
- vue双向绑定的原理 (68)
- springbootget请求 (58)
- docker网络三种模式 (67)
- spring控制反转 (71)
- data:image/jpeg (69)
- base64 (69)
- java分页 (64)
- kibanadocker (60)
- qabstracttablemodel (62)
- java生成pdf文件 (69)
- deletelater (62)
- com.aspose.words (58)
- android.mk (62)
- qopengl (73)
- epoch_millis (61)
本文暂时没有评论,来添加一个吧(●'◡'●)