Spark写入GreenPlum
、测试方式
为了测试在Spark框架下以不同方式将数据插入GreenPlum的性能,这里需要准备足够量的数据。我采用的是gzip格式压缩的Hdfs数据,在这种压缩格式下数据量约1.2G,数据条目共34946664条,测试中会插入一个GreenPlum表中,按其中一个字段作为分区存入三个表分区中。
输出表
1、DataFrame.write()
支持:该方式使用greenplum-jdbc包,通过JDBC连接来向gp库插入数据。就是通过dataFrame.write().jdbc()的形式将数据写入gp库。
效率:但是其写入的速度非常慢,写入上述数据到GP库包括数据的初始化(并行初始化时间非常短,可忽略)的耗时为1.7h这个级别,约每秒写入6000条数据(MySql数据库也维持在这种写入效率)。在大数据量的时候实在不建议使用这种方式写入。
2、CopyManager.copyIn()
支持:该方式使用postgresql的JDBC包,利用了postgresql的特性。使用包中提供的CopyManager将字节流直接写入数据库中,这种写入的方式无法提供事务处理,当然我们也不需要事务处理。
Dataset df = getData().persist(StorageLevel.MEMORY_AND_DISK());df.foreachPartition(new ForeachPartitionFunction<Location>() {// URLString url = GREENPLUM_URL;// 数据库用户名String username = GREENPLUM_USER;// 数据库密码String password = GREENPLUM_AUTH;@Overridepublic void call(Iterator<Location> iterator) throws Exception {// 加载驱动Class.forName("org.postgresql.Driver");// 获取连接Connection connection = DriverManager.getConnection(url, username, password);CopyManager copyManager = new CopyManager((BaseConnection) connection);StringBuilder sb = new StringBuilder();while(iterator.hasNext()){Location row = iterator.next();sb.append(row.toString()+"\r\n");}ByteArrayInputStream inputStream = new ByteArrayInputStream(sb.toString().getBytes(StandardCharsets.UTF_8));copyManager.copyIn("COPY tb_gp_test (imsi,stime,etime,longitude,latitude,cityid,eci) FROM STDIN",inputStream);connection.close();}});}
效率:其写入GP库的效率很高,写入上述数据到GP库(包括数据初始化的耗时)为4min到6分钟这个级别。
3、Greenplum-Spark Connector
支持:该方式只支持Spark2.x以上的版本,需要导入greenplum-spark包,我在使用Maven时没发现有该包,因此需要自己下载并导入Maven私库中使用。对应不同greenplum版本的包下载地址是:
https://network.pivotal.io/products/pivotal-gpdb/releases。
Greenplum-Spark Connector版本对应
效率:但是其写入GP库的效率很高,写入上述数据到GP库包括数据的初始化的耗时为5min到6min这个级别。与copyIn的方式效率差不多,而且更适配于Spark的并行式计算方式,问题在于这个包只能支持比较高版本的Spark,老集群只能用copyIn的方式来提升效率了。
来源 https://www.modb.pro/db/330771
相关文章