好程序员大数据教程:SparkShell和IDEA中编写Spark程序,spark-shell是Spark自带的交互式Shell程序,方便用户进行交互式编程,用户可以在该命令行下用Scala编写Spark程序。spark-shell程序一般用作Spark程序测试练习来用。spark-shell属于Spark的特殊应用程序,我们可以在这个特殊的应用程序中提交应用程序
spark-shell启动有两种模式,local模式和cluster模式,分别为
local模式:
spark-shell
local模式仅在本机启动一个SparkSubmit进程,没有与集群建立联系,虽然进程中有SparkSubmit但是不会被提交到集群红
Cluster模式(集群模式):
spark-shell \
--master spark://hadoop01:7077 \--executor-memory 512m \--total-executor-cores 1后两个命令不是必须的 --master这条命令是必须的(除非在jar包中已经指可以不指定,不然就必须指定)退出shell
千万不要ctrl+c spark-shell 正确退出 :quit 千万不要ctrl+c退出 这样是错误的 若使用了ctrl+c退出 使用命令查看监听端口 netstat - apn | grep 4040 在使用kill -9 端口号 杀死即可
3.25.11 spark2.2shell和spark1.6shell对比
ps:启动spark-shell若是集群模式,在webUI会有一个一直执行的任务
通过IDEA创建Spark工程
ps:工程创建之前步骤省略,在scala中已经讲解,直接默认是创建好工程的
对工程中的pom.xml文件配置
<!-- 声明公有的属性 -->
<!-- 声明并引入公有的依赖 -->
Spark实现WordCount程序
Scala版本
import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}object SparkWordCount { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("dri/wordcount").setMaster("local[*]") //创建sparkContext对象
//通过sparkcontext对象就可以处理数据
//读取文件 参数是一个String类型的字符串 传入的是路径
//切分数据
//将每一个单词生成元组 (单词,1)
//spark中提供一个算子 reduceByKey 相同key 为一组进行求和 计算value
//对当前这个结果进行排序 sortBy 和scala中sotrBy是不一样的 多了一个参数
//默认是升序 false就是降序
//将数据提交到集群存储 无法返回值
//回收资源停止sc,结束任务
Java版本
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;import java.util.Arrays;import java.util.Iterator;import java.util.List;public class JavaWordCount { public static void main(String[] args) { //1.先创建conf对象进行配置主要是设置名称,为了设置运行模式
//2.创建context对象
//进行切分数据 flatMapFunction是具体实现类
//将数据生成元组
//第一个泛型是输入的数据类型 后两个参数是输出参数元组的数据
JavaPairRDDtuples = words.mapToPair(new PairFunction () { @Override public Tuple2 call(String s) throws Exception { return new Tuple2 (s, 1); } });复制代码
//聚合
JavaPairRDDsumed = tuples.reduceByKey(new Function2 () { 复制代码
@Override
//第一个Integer是相同key对应的value
//第二个Integer是相同key 对应的value
public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } });复制代码
//因为Java api没有提供sortBy算子,此时需要将元组中的数据进行位置调换,然后在排序,排完序在换回
//第一次交换是为了排序
JavaPairRDDswaped = sumed.mapToPair(new PairFunction , Integer, String>() { @Override public Tuple2 call(Tuple2 tup) throws Exception { return tup.swap(); } });复制代码
//排序
JavaPairRDDsorted = swaped.sortByKey(false);复制代码
//第二次交换是为了最终结果 <单词,数量>
JavaPairRDDres = sorted.mapToPair(new PairFunction , String, Integer>() { @Override public Tuple2 call(Tuple2 tuple2) throws Exception { return tuple2.swap(); } }); System.out.println(res.collect()); res.saveAsTextFile("out1"); jsc.stop(); }}复制代码