博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
37、数据源之通用的load和save操作
阅读量:5220 次
发布时间:2019-06-14

本文共 4330 字,大约阅读时间需要 14 分钟。

一、通用的load和save操作

1、概述

对于Spark SQL的DataFrame来说,无论是从什么数据源创建出来的DataFrame,都有一些共同的load和save操作。load操作主要用于加载数据,创建出DataFrame;save操作,主要用于将DataFrame中的数据保存到文件中。
Java版本DataFrame df = sqlContext.read().load("users.parquet");df.select("name", "favorite_color").write().save("namesAndFavColors.parquet");Scala版本val df = sqlContext.read.load("users.parquet")df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

2、java实现

package cn.spark.study.sql;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.sql.DataFrame;import org.apache.spark.sql.SQLContext;/** * 通用的load和save操作 * @author Administrator * */public class GenericLoadSave {    public static void main(String[] args) {        SparkConf conf = new SparkConf()                 .setAppName("GenericLoadSave");        JavaSparkContext sc = new JavaSparkContext(conf);        SQLContext sqlContext = new SQLContext(sc);            DataFrame usersDF = sqlContext.read().load(                "hdfs://spark1:9000/users.parquet");        usersDF.select("name", "favorite_color").write()                .save("hdfs://spark1:9000/namesAndFavColors.parquet");       }    }

3、scala实现

package cn.spark.study.sqlimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.sql.SQLContextimport org.apache.spark.sql.DataFrame/** * @author Administrator */object GenericLoadSave {    def main(args: Array[String]): Unit = {    val conf = new SparkConf()        .setAppName("GenericLoadSave")    val sc = new SparkContext(conf)    val sqlContext = new SQLContext(sc)      val usersDF = sqlContext.read.load("hdfs://spark1:9000/users.parquet")    usersDF.write.save("hdfs://spark1:9000/namesAndFavColors_scala")    }  }

二、手动指定数据源类型

1、java实现

package cn.spark.study.sql;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.sql.DataFrame;import org.apache.spark.sql.SQLContext;/** * 手动指定数据源类型 * @author Administrator * */ public class ManuallySpecifyOptions {    public static void main(String[] args) {        SparkConf conf = new SparkConf()                   .setAppName("ManuallySpecifyOptions");        JavaSparkContext sc = new JavaSparkContext(conf);        SQLContext sqlContext = new SQLContext(sc);                DataFrame peopleDF = sqlContext.read().format("json")                .load("hdfs://spark1:9000/people.json");        peopleDF.select("name").write().format("parquet")                  .save("hdfs://spark1:9000/peopleName_java");         }    }

2、scala实现

package cn.spark.study.sqlimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.sql.SQLContext/** * @author Administrator */object ManuallySpecifyOptions {    def main(args: Array[String]): Unit = {    val conf = new SparkConf()        .setAppName("ManuallySpecifyOptions")      val sc = new SparkContext(conf)    val sqlContext = new SQLContext(sc)      val peopleDF = sqlContext.read.format("json").load("hdfs://spark1:9000/people.json")    peopleDF.select("name").write.format("parquet").save("hdfs://spark1:9000/peopleName_scala")     }  }

三、Save Mode

1、概述

Spark SQL对于save操作,提供了不同的save mode。主要用来处理,当目标位置,已经有数据时,应该如何处理。而且save操作并不会执行锁操作,并且不是原子的,因此是有一定风险出现脏数据的。

      save mode

                           意义

SaveMode.ErrorIfExists (默认)

如果目标位置已经存在数据,那么抛出一个异常

SaveMode.Append

如果目标位置已经存在数据,那么将数据追加进去

SaveMode.Overwrite

如果目标位置已经存在数据,那么就将已经存在的数据删除,用新数据进行覆盖

SaveMode.Ignore

如果目标位置已经存在数据,那么就忽略,不做任何操作。

2、java实现

package cn.spark.study.sql;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.sql.DataFrame;import org.apache.spark.sql.SQLContext;import org.apache.spark.sql.SaveMode;/** * SaveModel示例 * @author Administrator * */public class SaveModeTest {    @SuppressWarnings("deprecation")    public static void main(String[] args) {        SparkConf conf = new SparkConf()                   .setAppName("SaveModeTest");        JavaSparkContext sc = new JavaSparkContext(conf);        SQLContext sqlContext = new SQLContext(sc);                DataFrame peopleDF = sqlContext.read().format("json")                .load("hdfs://spark1:9000/people.json");         peopleDF.save("hdfs://spark1:9000/people_savemode_test", "json", SaveMode.Append);    }    }

转载于:https://www.cnblogs.com/weiyiming007/p/11274767.html

你可能感兴趣的文章
peewee 事物 回滚
查看>>
013 CephFS文件系统
查看>>
<Core Java> 4.4 静态域和静态方法
查看>>
js中const,var,let区别
查看>>
ubuntu 如何卸载 nginx
查看>>
JS之放大镜效果
查看>>
基于UML网络教学管理平台模型的搭建
查看>>
不同的色深条件(8、16、24、32),像素绘制方式
查看>>
二分法原理
查看>>
push指令的执行过程
查看>>
陌生的熟悉地
查看>>
python 案例 011(猜数值)
查看>>
新写PHP HTTP断点续传类文件代码
查看>>
MySQL数据库索引(上)
查看>>
使用expect实现自动登录的脚本
查看>>
【转载】SSD断电保护原理
查看>>
[转]Struts2理解--动态方法和method属性及通配符_默认Action
查看>>
编写一个程序,指定一个文件夹,能自动计算出其总容量
查看>>
排球积分程序
查看>>
表关联ID相同数据update修改
查看>>