Spark Streaming(5):Spark Window function in Java

Stella981
• 阅读 645

首先,看下window函数的图解:

Spark Streaming(5):Spark Window function in Java

下面这个代码是计算一分钟之内的单词数量统计,每两秒获取一次数据,同时处理数据时间也是两秒,窗口大小为1分钟

1.数据源

package com.ssm.test;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;

public class SocketTest {

    public static void main(String[] args) {
        try{
            ServerSocket server=new ServerSocket(9999);
            System.out.println("The Server has started");
            
            Socket socket=server.accept();
            System.out.println("There was a socket client connected");
            
            BufferedReader br=new BufferedReader(new InputStreamReader(System.in));
            String line=br.readLine();
            PrintWriter writer=new PrintWriter(socket.getOutputStream());
            
            while(!line.equals("end")){
                writer = new PrintWriter(socket.getOutputStream());
                writer.println("If you are not brave enough, no one will back you up.");
                writer.flush();
                System.out.println(System.currentTimeMillis() + "\nServer send:\t"+line);
                Thread.sleep(2000);
            }
            writer.close();
            socket.close();
            server.close();
        }catch(Exception e) {
            e.printStackTrace();
        }
    }
}

2.spark streaming处理代码

package com.ssm.test;

import java.util.Arrays;
import java.util.Iterator;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;

public class WordsCountTest {

    public static void main(String[] args) throws Exception {
        
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("WordsCount");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(2));
        JavaDStream<String> lines = jssc.socketTextStream("127.0.0.1", 9999).window(new Duration(60000));
        
        lines.flatMap(new FlatMapFunction<String, String>(){
            public Iterator<String> call(String x){
                return Arrays.asList(x.split(" ")).iterator();
            }
        }).mapToPair(new PairFunction<String, String, Integer>(){
            public Tuple2<String, Integer> call(String s){
                return new Tuple2<String, Integer>(s, 1);
            }
        }).reduceByKey(new Function2<Integer, Integer, Integer>(){
            public Integer call(Integer i1, Integer i2){
                return i1+i2;
            }
        }).print();
        
        jssc.start();
        jssc.awaitTermination();

    }

}
点赞
收藏
评论区
推荐文章
Karen110 Karen110
3年前
一篇文章带你了解JavaScript时间
一、前言setTimeout(function,milliseconds)在等待指定的毫秒数后执行函数。setInterval(function,milliseconds)setTimeout()相同,但会重复执行。二、时间事件窗口对象允许在指定的时间间隔执行代码。时间间隔称为定时事件。1\.setTimeout()方法window.set
待兔 待兔
2个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
个推分享Spark性能调优指南:性能提升60%↑ 成本降低50%↓
前言Spark是目前主流的大数据计算引擎,功能涵盖了大数据领域的离线批处理、SQL类处理、流式/实时计算、机器学习、图计算等各种不同类型的计算操作,应用范围与前景非常广泛。作为一种内存计算框架,Spark运算速度快,并能够满足UDF、大小表Join、多路输出等多样化的数据计算和处理需求。作为国内专业的数据智能服务商,个推从早期的1.3版本便引入Spark,
Wesley13 Wesley13
2年前
java——20171121
!(http://a.51jsoft.com/uploads/default/original/1X/c542896b094a42a5653fb75adf6cdacd6e35d12e.png)!(https://static.oschina.net/uploads/space/2017/1121/210719_G80Z_3715033.png)
Wesley13 Wesley13
2年前
Java日期时间API系列31
  时间戳是指格林威治时间1970年01月01日00时00分00秒起至现在的总毫秒数,是所有时间的基础,其他时间可以通过时间戳转换得到。Java中本来已经有相关获取时间戳的方法,Java8后增加新的类Instant等专用于处理时间戳问题。 1获取时间戳的方法和性能对比1.1获取时间戳方法Java8以前
Stella981 Stella981
2年前
SparkSql学习1 —— 借助SQlite数据库分析2000万数据
总所周知,Spark在内存计算领域非常强势,是未来计算的方向。Spark支持类Sql的语法,方便我们对DataFrame的数据进行统计操作。但是,作为初学者,我们今天暂且不讨论Spark的用法。我给自己提出了一个有意思的思维游戏:Java里面的随机数算法真的是随机的吗?好,思路如下:1\.取样,利用Java代码随机生成2000万条01
Stella981 Stella981
2年前
Spark Streaming和Kafka集成深入浅出
写在前面本文主要介绍SparkStreaming基本概念、kafka集成、Offset管理本文主要介绍SparkStreaming基本概念、kafka集成、Offset管理一、概述Spark Streaming顾名思义是spark的流式处理框架,是面向海量数据实现高吞吐量、高可用的分布式实时计算。关于spark的安装可以参考Spa
Stella981 Stella981
2年前
Spark Python 快速体验
Spark是2015年最受热捧大数据开源平台,我们花一点时间来快速体验一下Spark。Spark技术栈!(http://static.oschina.net/uploads/space/2016/0331/103717_d6qN_1450051.png)如上图所示,Spark的技术栈包括了这些模块:核心模块:Spark
Stella981 Stella981
2年前
Flink编程练习
\TOC\1.wordcount利用socket作为数据源,对输入的每行数据进行单词计数。计算频率为processtime的每10秒一次,结果输出到terminal。objectSocketWindowWordCount{defmain(args:ArrayString):U
Stella981 Stella981
2年前
JOptionPane修改图标
1.在Linux平台下.JOptionPane会显示Java默认的图标,在window平台不显示图标,如何替换这个图标了?2JOptionPane.setIcon(Icon)修改的是内容区域的icon,而不是左上角的Icon.所以需要通过修改Jdialog/Frame的图标来达到修改默认图标的问题.3.代码:if(JOptio