Kafka +深度学习+ MQTT搭建可扩展的物联网平台【附源码】

Stella981
• 阅读 559

问题导读

1.本项目包含哪些源码?
2.本文使用了哪些框架?
3.KSQL UDF如何实现?

Kafka +深度学习+ MQTT搭建可扩展的物联网平台【附源码】

文中链接查看,可点击:阅读原文

物联网+大数据+机器学习将会是以后的趋势,这里介绍一篇这方面的文章包含源码。

混合机器学习基础架构构建了一个场景,利用Apache Kafka作为可扩展的中枢神经系统。 公共云用于极大规模地训练分析模型(例如,通过Google ML Engine在Google Cloud Platform(GCP)上使用TensorFlow和TPU,预测(即模型推断)在本地Kafka基础设施的执行( 例如,利用Kafka Streams或KSQL进行流分析)。

本文重点介绍内部部署。 创建了一个带有KSQL UDF的Github项目,用于传感器分析。 它利用KSQL的新API功能,使用Java轻松构建UDF / UDAF函数,对传入事件进行连续流处理。

使用案例:Connected Cars - 使用深度学习的实时流分析

从连接设备(本例中的汽车传感器)连续处理数百万个事件:
Kafka +深度学习+ MQTT搭建可扩展的物联网平台【附源码】

为此构建了不同的分析模型。 他们在公共云上接受TensorFlow,H2O和Google ML Engine的训练。 模型创建不是此示例的重点。 最终模型已经可以投入生产,可以部署用于实时预测。

模型服务可以通过模型server 完成,也可以本地嵌入到流处理应用程序中。 参阅RPC与流处理的权衡,以获得模型部署和....

演示:使用MQTT,Kafka和KSQL在Edge进行模型推理

Github项目:深度学习+KSQL UDF 用于流式异常检测MQTT物联网传感器数据
(下载源码: Kafka +深度学习+ MQTT搭建可扩展的物联网平台【附源码】  ksql-udf-deep-learning-mqtt-iot-master.zip (474.64 KB, 下载次数: 0) )
该项目的重点是通过MQTT将数据提取到Kafka并通过KSQL处理数据:

Kafka +深度学习+ MQTT搭建可扩展的物联网平台【附源码】 Confluent MQTT Proxy的一大优势是无需MQTT Broker即可实现物联网方案的简单性。 可以通过MQTT代理将消息直接从MQTT设备转发到Kafka。 这显着降低了工作量和成本。 如果你“只是”想要在Kafka和MQTT设备之间进行通信,这是一个完美的解决方案。

如果你想看到另一部分(与Elasticsearch / Grafana等接收器应用程序集成),请查看Github项目“KSQL for streaming IoT data”。 这实现了通过Kafka Connect和Elastic连接器与ElasticSearch和Grafana的集成。(源码下载:链接: https://pan.baidu.com/s/1FCFgAoF9v1ihp9fyqHeKag 密码: 67sz)

KSQL UDF - 源代码

开发UDF非常容易。 只需在UDF类中的一个Java方法中实现该函数:

[Bash shell] 纯文本查看 复制代码

?

1

@Udf(description = "apply analytic model to sensor input" )             public String anomaly(String sensorinput){ "YOUR LOGIC" }

这里是所有代码:

[Java] 纯文本查看 复制代码

?

01

02

03

04

05

06

07

08

09

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

package com.github.megachucky.kafka.streams.machinelearning;

import java.util.Arrays;

import hex.genmodel.GenModel;

import hex.genmodel.easy.EasyPredictModelWrapper;

import hex.genmodel.easy.RowData;

import hex.genmodel.easy.exception.PredictException;

import hex.genmodel.easy.prediction.AutoEncoderModelPrediction;

import io.confluent.ksql.function.udf.Udf;

import io.confluent.ksql.function.udf.UdfDescription;

@UdfDescription (name = "anomaly" , description = "anomaly detection using deep learning" )

public class Anomaly {

// Model built with H2O R API:

// anomaly_model <- h2o.deeplearning(x = names(train_ecg),training_frame =

// train_ecg,activation = "Tanh",autoencoder = TRUE,hidden =

// c(50,20,50),sparse = TRUE,l1 = 1e-4,epochs = 100)

// Name of the generated H2O model

private static String modelClassName = "io.confluent.ksql.function.udf.ml"

+ ".DeepLearning_model_R_1509973865970_1" ;

@Udf (description = "apply analytic model to sensor input" )

public String anomaly(String sensorinput) {

System.out.println( "Kai: DL-UDF starting" );

GenModel rawModel;

try {

rawModel = (hex.genmodel.GenModel) Class.forName(modelClassName).newInstance();

EasyPredictModelWrapper model = new EasyPredictModelWrapper(rawModel);

// Prepare input sensor data to be in correct data format for the autoencoder model (double[]):

String[] inputStringArray = sensorinput.split( "#" );

double [] doubleValues = Arrays.stream(inputStringArray)

.mapToDouble(Double::parseDouble)

.toArray();

RowData row = new RowData();

int j = 0 ;

for (String colName : rawModel.getNames()) {

row.put(colName, doubleValues[j]);

j++;

}

AutoEncoderModelPrediction p = model.predictAutoEncoder(row);

// System.out.println("original: " + java.util.Arrays.toString(p.original));

// System.out.println("reconstructedrowData: " + p.reconstructedRowData);

// System.out.println("reconstructed: " + java.util.Arrays.toString(p.reconstructed));

double sum = 0 ;

for ( int i = 0 ; i < p.original.length; i++) {

sum += (p.original[i] - p.reconstructed[i]) * (p.original[i] - p.reconstructed[i]);

}

// Calculate Mean Square Error => High reconstruction error means anomaly

double mse = sum / p.original.length;

System.out.println( "MSE: " + mse);

String mseString = "" + mse;

return (mseString);

} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {

System.out.println(e.toString());

} catch (PredictException e) {

System.out.println(e.toString());

}

return null ;

}

}

如何使用Apache Kafka和MQTT Proxy运行演示?

执行演示的所有步骤都在Github项目中描述。
你只需安装Confluent Platform,然后按照以下步骤部署UDF,创建MQTT事件并通过KSQL levera处理它们....
这里使用Mosquitto生成MQTT消息。 当然,也可以使用任何其他MQTT客户端。 这是开放和标准化协议的巨大好处。

到此结束,文章虽然简短,但是内容确实很丰富,特别项目的源码的阅读,在github上有详细的介绍。为方便阅读,微信点此可查看

https://github.com/kaiwaehner/ksql-udf-deep-learning-mqtt-iot

https://github.com/kaiwaehner/ksql-fork-with-deep-learning-function

原文有内嵌链接。

推荐关注Java大数据编程学习,第一时间,分享更多干货,欢迎转发和分享。

Kafka +深度学习+ MQTT搭建可扩展的物联网平台【附源码】

本文分享自微信公众号 - web项目开发(javawebkaifa)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Easter79 Easter79
2年前
swap空间的增减方法
(1)增大swap空间去激活swap交换区:swapoff v /dev/vg00/lvswap扩展交换lv:lvextend L 10G /dev/vg00/lvswap重新生成swap交换区:mkswap /dev/vg00/lvswap激活新生成的交换区:swapon v /dev/vg00/lvswap
Jacquelyn38 Jacquelyn38
2年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Wesley13 Wesley13
2年前
Java获得今日零时零分零秒的时间(Date型)
publicDatezeroTime()throwsParseException{    DatetimenewDate();    SimpleDateFormatsimpnewSimpleDateFormat("yyyyMMdd00:00:00");    SimpleDateFormatsimp2newS
Wesley13 Wesley13
2年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
2年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
2年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
3个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这