博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
好程序员大数据教程:SparkShell和IDEA中编写Spark程序
阅读量:5983 次
发布时间:2019-06-20

本文共 4904 字,大约阅读时间需要 16 分钟。

好程序员大数据教程:SparkShell和IDEA中编写Spark程序,spark-shell是Spark自带的交互式Shell程序,方便用户进行交互式编程,用户可以在该命令行下用Scala编写Spark程序。spark-shell程序一般用作Spark程序测试练习来用。spark-shell属于Spark的特殊应用程序,我们可以在这个特殊的应用程序中提交应用程序

spark-shell启动有两种模式,local模式和cluster模式,分别为

local模式:

spark-shell

local模式仅在本机启动一个SparkSubmit进程,没有与集群建立联系,虽然进程中有SparkSubmit但是不会被提交到集群红

Cluster模式(集群模式):

spark-shell \

--master spark://hadoop01:7077 \
--executor-memory 512m \
--total-executor-cores 1
后两个命令不是必须的 --master这条命令是必须的(除非在jar包中已经指可以不指定,不然就必须指定)

退出shell

千万不要ctrl+c spark-shell 正确退出 :quit 千万不要ctrl+c退出 这样是错误的 若使用了ctrl+c退出 使用命令查看监听端口 netstat - apn | grep 4040 在使用kill -9 端口号 杀死即可

3.25.11 spark2.2shell和spark1.6shell对比

ps:启动spark-shell若是集群模式,在webUI会有一个一直执行的任务

通过IDEA创建Spark工程

ps:工程创建之前步骤省略,在scala中已经讲解,直接默认是创建好工程的

对工程中的pom.xml文件配置

<!-- 声明公有的属性 -->

<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.8</scala.version>
<spark.version>2.2.0</spark.version>
<hadoop.version>2.7.1</hadoop.version>
<scala.compat.version>2.11</scala.compat.version>
</properties>

<!-- 声明并引入公有的依赖 -->

<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>

Spark实现WordCount程序

Scala版本

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object SparkWordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("dri/wordcount").setMaster("local[*]")

//创建sparkContext对象

val sc = new SparkContext(conf)

//通过sparkcontext对象就可以处理数据

//读取文件 参数是一个String类型的字符串 传入的是路径

val lines: RDD[String] = sc.textFile(“dir/wordcount”)

//切分数据

val words: RDD[String] = lines.flatMap(_.split(" "))

//将每一个单词生成元组 (单词,1)

val tuples: RDD[(String, Int)] = words.map((_,1))

//spark中提供一个算子 reduceByKey 相同key 为一组进行求和 计算value

val sumed: RDD[(String, Int)] = tuples.reduceByKey(_+_)

//对当前这个结果进行排序 sortBy 和scala中sotrBy是不一样的 多了一个参数

//默认是升序 false就是降序

val sorted: RDD[(String, Int)] = sumed.sortBy(_._2,false)

//将数据提交到集群存储 无法返回值

sorted.foreach(println)

//回收资源停止sc,结束任务

sc.stop()
}
}

Java版本

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
public class JavaWordCount {
public static void main(String[] args) {

//1.先创建conf对象进行配置主要是设置名称,为了设置运行模式

SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");

//2.创建context对象

JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<String> lines = jsc.textFile("dir/file");

//进行切分数据 flatMapFunction是具体实现类

JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
List<String> splited = Arrays.asList(s.split(" "));
return splited.iterator();
}
});

//将数据生成元组

//第一个泛型是输入的数据类型 后两个参数是输出参数元组的数据

JavaPairRDD
tuples = words.mapToPair(new PairFunction
() { @Override public Tuple2
call(String s) throws Exception { return new Tuple2
(s, 1); } });复制代码
//聚合

JavaPairRDD
sumed = tuples.reduceByKey(new Function2
() {
复制代码

@Override

//第一个Integer是相同key对应的value

//第二个Integer是相同key 对应的value

public Integer call(Integer v1, Integer v2) throws Exception {                return v1 + v2;            }        });复制代码
//因为Java api没有提供sortBy算子,此时需要将元组中的数据进行位置调换,然后在排序,排完序在换回

//第一次交换是为了排序

JavaPairRDD
swaped = sumed.mapToPair(new PairFunction
, Integer, String>() { @Override public Tuple2
call(Tuple2
tup) throws Exception { return tup.swap(); } });复制代码
//排序

JavaPairRDD
sorted = swaped.sortByKey(false);复制代码
//第二次交换是为了最终结果 <单词,数量>

JavaPairRDD
res = sorted.mapToPair(new PairFunction
, String, Integer>() { @Override public Tuple2
call(Tuple2
tuple2) throws Exception { return tuple2.swap(); } }); System.out.println(res.collect()); res.saveAsTextFile("out1"); jsc.stop(); }}复制代码

转载地址:http://smgox.baihongyu.com/

你可能感兴趣的文章
由一条微博引发的 — Xcode LLDB 调试断点总结
查看>>
Android NDK开发扫盲及最新CMake的编译使用
查看>>
Weex开发系列(一):初识Weex
查看>>
开源 UI 库中,唯一同时实现了大表格虚拟化和树表格的 Table 组件
查看>>
找到思聪王
查看>>
[译] 学习 Spring Security(五):重发验证邮件
查看>>
快速的React Native开发方法
查看>>
rabbitmq中文教程python版 - 工作队列
查看>>
SpringBoot 1024行代码 - Eureka Server
查看>>
服务器时区问题
查看>>
JAVA反射技术应用-ReflectUtil
查看>>
removeGeneratedClassFiles Failed
查看>>
nagios安装全攻略
查看>>
Perl进阶知识点(2)
查看>>
Android adb.exe 启动失败
查看>>
我的友情链接
查看>>
使用JavaMail完成邮件的编写
查看>>
Xcode8修改或者新建的XIB文件 xcode7上报错问题
查看>>
MSSQL获取昨天,本周,本月。。。
查看>>
记录:我的大学的最后时光(大三下学期 _11
查看>>