Skip to content

wifi嗅探与客流统计

iHTCboy edited this page Mar 30, 2019 · 22 revisions

##一、wifi嗅探原理浅析 现在的手机都带有wifi功能,它平时总是在不停扫描周边的wifi热点。所谓扫描,就是发送下文中提到的“探查请求”信号。wifi嗅探器(或称wifi探针),不停地接收这类信号,并转换后发送到后台服务器。wifi嗅探是无线路由器的基本功能,只是一般的无线路由器不会把嗅探信号发送到后台服务器。
IEEE有个mac地址数据库可以根据mac地址查询设备的生产商。

##二、中科爱讯的wifi嗅探产品简介 ####TZ001只支持PC,它通过USB插在PC上。
PC安装它带的驱动程序后,在windows设备管理器上wifi探针显示为一个串口设备(如COM4或COM5)。需要自己写客户端程序接收来自串口的信号,并通过PC的网络发送到服务器端。88元。

####TZ006支持PC或安卓手机,通过USB口插在PC或手机(需转换头)。
TZ006不通过PC的网络传送上行数据,而是通过wifi热点。通过配置程序来设置通过哪个wifi热点来传输上行数据。上传的协议是UDP,提供了java/PHP的服务器端UDP接收数据的源码,几十行代码就可以。78元。

####TZ007是TZ006的增强,带有双wifi模块。
TZ006只带了一个wifi模块,不得不在嗅探模式和上传模式间切换。导致上行数据不是连续的,默认50秒上传一次数据。TZ007和TZ001的上传数据是连续的,只是一个通过PC的网络,一个通过wifi热点。198元。

####TZ002是TZ001的增强,可以直接插上网线。
相当于自带了一个小PC,并装好了驱动和上传程序。288元。

我们选择了TZ001作为烟草零售POS的配套wifi探针。需要云pos客户端从USB读数据,使用UDP协议上传到云端。 github目录/wifi-sniffer下保存有TZ001的配套软件。

##三、对TZ001的测试 将TZ001插入PC的USB口,需要安装USB转串口的驱动程序。执行CP210xVCPInstaller_x64.exe,安装窗口提示:
Welcome to the CP201X USB to UART Bridge Driver Installer

安装成功后在设备管理器中出现新的设备:

端口(COM和LTP)
 - Silicon Labs CP210x USB to UART Bridge(COM5)

putty是一个通用的终端软件,它支持从串口读数据。打开putty,在Session选项中:
connection type选Serial(默认SSH),Serial line输入COM5,Speed输入115200,Saved Sessions输入wifi-sniffer,点save按钮.

点击open按钮,出现黑窗口显示:

28:01:00:00:40:4A|70:F9:6D:34:BD:51|01|09|11|-75
80:56:F2:7B:47:1D|FF:FF:FF:FF:FF:FF|00|04|12|-88
B4:29:3D:01:11:06|FF:FF:FF:FF:FF:FF|00|04|1|-93
70:F9:6D:EA:95:D6|74:25:8A:8B:1D:31|02|00|1|-81
...

每一行都是探针侦测到的周边wifi设备发送的信号。
以竖线|分隔,第1列是wifi设备的mac地址;第2列是连接到的设备mac地址,一般是无线路由器,FF:FF:FF:FF:FF:FF表示尚未连上热点,如探测请求;第3列是帧大类:管理帧00、控制帧01、数据帧02;第4列是帧小类;第5列是信道(1-14);第6列是RSSI信号强度,最小是-100(表示最远)。

##四、wifi数据帧类型

参考这个文档

##五、TZ006测试 TZ006支持通过wifi上传数据。
下图是配置TZ006的截图。从图上可以看出,UDP服务器的地址是192.168.3.203。截图上没有参数还有:无线路由器的IP是192.168.3.1,wifi探针的IP是192.168.3.4(无线路由器自动分配的)。
UDP服务器就是下文中java代码实现的。通过java Recv执行后输出到屏幕上可以看到TZ006的上传数据:

48cd02
F4:8C:50:0A:B9:87|FF:FF:FF:FF:FF:FF|00|04|5|-80
E0:CB:4E:59:B0:5A|E0:CB:4E:59:B0:5C|02|00|6|-84
34:68:95:AD:05:49|FF:FF:FF:FF:FF:FF|00|04|8|-89
68:5D:43:F7:66:88|FF:FF:FF:FF:FF:FF|00|04|8|-86
28:E3:47:D2:0E:38|FF:FF:FF:FF:FF:FF|00|04|10|-88
2C:D0:5A:FC:97:A8|70:F9:6D:34:BD:51|01|09|11|-71
BC:77:37:43:F9:0E|FF:FF:FF:FF:FF:FF|00|04|1|-84
38:29:5A:64:B6:59|FF:FF:FF:FF:FF:FF|00|04|3|-87
38:71:DE:2B:0B:D9|74:E5:0B:8D:FC:19|01|09|6|-79
F0:D5:BF:4A:60:82|FF:FF:FF:FF:FF:FF|00|04|7|-90
 Mar 1, 2017 8:34:59 AM
48cd02
14:2D:27:8E:84:ED|FF:FF:FF:FF:FF:FF|00|04|9|-85
 Mar 1, 2017 8:34:59 AM
48cd02
30:10:B3:94:B3:40|FF:FF:FF:FF:FF:FF|00|04|2|-83
68:3E:34:9E:20:A1|FF:FF:FF:FF:FF:FF|00|04|7|-78
08:3E:8E:2F:D2:95|B8:08:D7:71:CB:90|02|00|6|-49
B8:08:D7:71:CB:88|B8:08:D7:71:CB:90|02|08|5|-46
 Mar 1, 2017 8:35:56 AM

48cd02是wifi探针的设备id。“ Mar 1, 2017 8:34:59 AM”是上传时间。注意Mar前面有个空格,估计空格是个标志,用来表明这一行是时间。
在上面的范例数据中,上传间隔是50秒。如果上传的数据超过10个mac地址,会拆分为10个mac一组。

在linux下发送udp请求

如果往本地UDP端口發送數據,那麼可以使用以下命令:

echo “hello” > /dev/udp/10.10.11.86/9002

意思是往本地192.168.1.81的5060端口發送數據包hello。

如果往遠程UDP端口發送數據,那麼可以使用以下命令:

$ apt install socat
$ echo “hello” | socat - udp4-datagram:10.10.11.86:9002

##六、java实现的udp server 默认监听端口9002,如果要监听其他端口,如8888,可以这样执行:

$ java Recv 8888

java源码:

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class Recv {

        public static final int DEFAULT_PORT = 9002;
        public static final int MAX_MSG_LEN = 1600;

        public static ExecutorService dataHandlePool = Executors
                        .newFixedThreadPool(64);


        public static void start(int port) {
                try {
                        DatagramSocket udp = new DatagramSocket(port);
                        DatagramPacket dPacket;
                        byte[] echo = new byte[1];
                        echo[0] = (byte)1;
                        while (true) {
                                dPacket = new DatagramPacket(new byte[MAX_MSG_LEN], MAX_MSG_LEN);
                                udp.receive(dPacket);
                                String result = new String(dPacket.getData(),0,dPacket.getLength());
                                System.out.println(result + " " + new Date(System.currentTimeMillis()).toLocaleString());
                                //返回一个字节给探针设备
                                InetAddress addr = dPacket.getAddress();
                                dPacket = new DatagramPacket(echo, echo.length);
                                dPacket.setAddress(addr);
                                udp.send(dPacket);
                                }

                } catch (SocketException e) {
                        e.printStackTrace();
                } catch (IOException e) {
                        e.printStackTrace();
                } catch (Exception e) {
                        e.printStackTrace();
                }
        }

        public static void main(String[] args) {
                if (args != null && args.length == 1) {
                        start(Integer.parseInt(args[0]));
                }else {
                        start(DEFAULT_PORT);
                }
        }
}                                        

编译上述Recv.java,然后用linux echo命令测试Recv.class:

$ javac Recv.java
$ java Recv &
$ echo “hello” > /dev/udp/127.0.0.1/9002
“hello”
 Feb 28, 2017 5:50:03 AM

##七、客流统计需求分析 wifi探针TZ001是中科爱讯的产品,中科爱讯也提供客流统计的云服务,叫每时每客。每时每客九项核心数据: ####1.总客流量 到达店铺或区域的整体客流量。应是以日/周/月为单位统计,应为“人次”数。如果一个人一天进店两次,算两个人?

####2.驻店时长 进入店铺顾客在店内停留的时长。 也是按“人次”。如果一个人进来两次(时间间隔超过阈值,如5分钟),两次的时长不应叠加。

####3.进店跳出量 进入店铺后离开店铺的顾客流量。

####4.多店铺管理 多个店铺数据实时统计,数据统一管理。 多个店铺的数据合并,视为一个大店铺?

####5.新老顾客量 在某一时段内第一次/多次以上进入店铺的顾客流量

####6.进店客流量 进入店铺的客流量。包括在店的和离开的?

####7.在店客流量 当前在店铺的顾客流量

####8.老顾客增长量 在某一时段内二次进入店铺的顾客增长数量

####9.老顾客流失量 在某一时段内多次进入店铺的顾客流失数量

##八、可行性分析 客流统计的实现可分为两大部分:

  • 流计算:处理wifi探测数据,识别出顾客进店、离店事件,更新在店人数
  • 批量计算:处理当天mac地址,识别出他是新顾客还是老顾客

wifi探针给的样例代码使用UDP协议上传数据到云端。UDP协议比TCP协议轻量,但不保证消息能发送成功。对于UDP数据的处理有两种方案: 1.使用netty框架自己写java程序实现。计算出顾客“在店数”写入redis,计算出“进店”和“离店”事件写入kafka。 2.wifi探针数据用logstash导入kafka,用spark集群处理探针数据,计算出“在店数”写入redis,计算出“进店”和“离店”事件写入kafka。

目前选择用logstash转换UDP协议到kafka的方案。

客户端实现

云pos客户端实现delphi实现。wifi探针客户端程序也可用delphi编写,甚至可以把USB转串口的驱动程序打入到云POS安装程序中。
客户端读取串口的wifi探测数据,进行UDP打包,格式参照了TZ006。只是TZ006是批量上传,而设想的delphi实现是实时上传数据。这样,导致每次只上传一条wifi探测数据。具体的数据格式类似于:

48cd02|14:2D:27:8E:84:ED|FF:FF:FF:FF:FF:FF|00|04|9|-85|Mar 1, 2017 8:34:59 AM

开始是探针id,然后是来自串口的wifi探测数据,最后是时间。

####NGINX实现UDP负载均衡 nginx支持UDP的负载均衡,可选择默认的按IP的负载均衡策略(来自同一IP的数据会被反向代理到同一个应用)。这是最常见负载均衡策略,可支持会话粘连。nginx实现的UDP负载均衡配置:

stream {
    server {
        listen 9003 udp;
        proxy_pass wifi_sniffer_backend;
        proxy_responses 0;
    }
    upstream wifi_sniffer_backend {
        server 127.0.0.1:9002;
        ...  (其他的UDP应用)
    }
}

在本地启动UDP测试应用,并发送“hello”到9003端口,:

$ java Recv 9002 &
$ echo “hello” | socat - udp4-datagram:127.0.0.1:9003
“hello”
 Mar 1, 2017 8:44:32 AM

发送到9003端口的数据被NGINX反向代理到了9002端口,并被Recv类输出到了屏幕。(如果不在命令的最后加上&符号,需要另外再开一个终端。) ####logstash处理UDP logstash有个UDP输入插件,可以解析UDP数据。logstash的输出插件选择kafka。通过logstash可以根据UDP数据生成kafka消息。
使用logstash的codec插件将消息的格式转换为json或其他大数据平台容易处理的格式。UDP数据格式类似:

48cd02|14:2D:27:8E:84:ED|FF:FF:FF:FF:FF:FF|00|04|9|-85|Mar 1, 2017 8:34:59 AM

开始是探针id,然后是来自串口的wifi探测数据,最后是时间。 losgstash的配置文件:

input{
    udp{
        port=>9999 #监听本机的9999端口,接收udp信息
    }
}
filter{
        mutate{
                split=>{"message"=>"|"}     #按照字符”|”分隔,并为每个分隔的数据设置field
                add_field =>{ "COL1"=> "%{[message][0]}" }
                add_field =>{ "COL2"=> "%{[message][1]}" }
                add_field =>{ "COL3"=> "%{[message][2]}" }
                add_field =>{ "COL4"=> "%{[message][3]}" }
                add_field =>{ "COL5"=> "%{[message][4]}" }
                add_field =>{ "COL6"=> "%{[message][5]}" }
                add_field =>{ "COL7"=> "%{[message][6]}" }
                add_field =>{ "COL8"=> "%{[message][7]}" }
                remove_field => ["message"]
        }
}
output{
    stdout{
         codec=>rubydebug
    }
    file{
        path=>"/opt/logstash/file/wifi_sniffer.log"
    }
    kafka{
         topic_id => "wifi_sniffer"
         bootstrap_servers => "10.0.14.205:6667,10.0.14.206:6667,10.0.14.207:6667"
    }
}

####java直接处理UDP数据 用java直接处理UDP数据也不难,只是对框架要求高。需要用到netty框架。
Java处理UDP数据的设想:
在java内存中维护一个hashmap,key是“探针id+mac地址”,value是“创建时间+计数+更新时间”。当店里“没人”时,清单是空的。假定每个店里只有一个探针。

【主线程】:
开始处理UDP数据,解析出一个mac地址,看内存中同一个探针名下是否存在这个mac地址:

  • 存在相同mac,“计数”加一,“更新时间”为当前时间,更新“信号强度”
  • 不存在相同mac,向内存添加一条记录。向队列(kafka)添加一个“进店”的消息。redis中的“在店人数”加一。

【定时线程(间隔10秒?)】:
扫描内存中的数据,看是否有的手机(按mac)已经很长时间(如5分钟)没发送wifi信号了。如果存在这样的记录,就把它从内存中移除。向kafka队列中添加一个“离店”的消息。

mac地址清单

每个店都需要维护一份历史mac地址清单。key是“探针id+mac地址”。属性有首次探测时间、最后探测时间、进店次数。
这个mac地址的数量值很多,并不适合放在关系数据库中。可以考虑放在HDFS中。

####批量计算 MapReduce(MR)的一个重要目标是计算出一个顾客是新顾客还是老顾客。两个输入:1)当天的客流记录;2)历史mac地址。两个输入进行比对,如果客流记录中的mac地址在“历史mac地址清单”中存在是老顾客,否则是新顾客。

输出:1)更新“历史mac地址”;2)汇总“客流记录”到“日报”中

####数据存储

  • 历史mac地址(HDFS)
    保存了所有探针历史上曾经检测到的mac地址清单,key是“探针id+mac地址”。值有首次探测时间、最后探测时间、进店次数
  • 客流记录(redis)
    探针id、mac地址、进入时间、离开时间、停留时间、计数
  • 在店人数(redis)
    探针id、mac地址、进入时间、计数
  • 日报(mysql)
    日客流、新顾客数、老顾客数、停留时长
  • 月报(mysql)

##九、规划方案 ####整体架构 上图中:spark流计算的输出:1.在店人数写入redis;2.进店/出店消息写入kafka。批量计算的输入:1.kafka中的出店消息;2.历史mac地址清单。批量计算的输出:1.更新历史mac地址;2.日报写入mysql。
前端应用负责渲染出网页,将redis、mysql中的数据展现给店主看。还需要开发出展现全市数据的网页。