一、通用的load和save操作
1、概述
对于Spark SQL的DataFrame来说,无论是从什么数据源创建出来的DataFrame,都有一些共同的load和save操作。load操作主要用于加载数据,创建出DataFrame;save操作,主要用于将DataFrame中的数据保存到文件中。
Java版本DataFrame df = sqlContext.read().load("users.parquet");df.select("name", "favorite_color").write().save("namesAndFavColors.parquet");Scala版本val df = sqlContext.read.load("users.parquet")df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
2、java实现
package cn.spark.study.sql;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.sql.DataFrame;import org.apache.spark.sql.SQLContext;/** * 通用的load和save操作 * @author Administrator * */public class GenericLoadSave { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("GenericLoadSave"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); DataFrame usersDF = sqlContext.read().load( "hdfs://spark1:9000/users.parquet"); usersDF.select("name", "favorite_color").write() .save("hdfs://spark1:9000/namesAndFavColors.parquet"); } }
3、scala实现
package cn.spark.study.sqlimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.sql.SQLContextimport org.apache.spark.sql.DataFrame/** * @author Administrator */object GenericLoadSave { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("GenericLoadSave") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val usersDF = sqlContext.read.load("hdfs://spark1:9000/users.parquet") usersDF.write.save("hdfs://spark1:9000/namesAndFavColors_scala") } }
二、手动指定数据源类型
1、java实现
package cn.spark.study.sql;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.sql.DataFrame;import org.apache.spark.sql.SQLContext;/** * 手动指定数据源类型 * @author Administrator * */ public class ManuallySpecifyOptions { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("ManuallySpecifyOptions"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); DataFrame peopleDF = sqlContext.read().format("json") .load("hdfs://spark1:9000/people.json"); peopleDF.select("name").write().format("parquet") .save("hdfs://spark1:9000/peopleName_java"); } }
2、scala实现
package cn.spark.study.sqlimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.sql.SQLContext/** * @author Administrator */object ManuallySpecifyOptions { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("ManuallySpecifyOptions") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val peopleDF = sqlContext.read.format("json").load("hdfs://spark1:9000/people.json") peopleDF.select("name").write.format("parquet").save("hdfs://spark1:9000/peopleName_scala") } }
三、Save Mode
1、概述
Spark SQL对于save操作,提供了不同的save mode。主要用来处理,当目标位置,已经有数据时,应该如何处理。而且save操作并不会执行锁操作,并且不是原子的,因此是有一定风险出现脏数据的。
save mode | 意义 |
SaveMode.ErrorIfExists (默认) | 如果目标位置已经存在数据,那么抛出一个异常 |
SaveMode.Append | 如果目标位置已经存在数据,那么将数据追加进去 |
SaveMode.Overwrite | 如果目标位置已经存在数据,那么就将已经存在的数据删除,用新数据进行覆盖 |
SaveMode.Ignore | 如果目标位置已经存在数据,那么就忽略,不做任何操作。 |
2、java实现
package cn.spark.study.sql;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.sql.DataFrame;import org.apache.spark.sql.SQLContext;import org.apache.spark.sql.SaveMode;/** * SaveModel示例 * @author Administrator * */public class SaveModeTest { @SuppressWarnings("deprecation") public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("SaveModeTest"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); DataFrame peopleDF = sqlContext.read().format("json") .load("hdfs://spark1:9000/people.json"); peopleDF.save("hdfs://spark1:9000/people_savemode_test", "json", SaveMode.Append); } }