本文主要包括:
- 布隆过滤器原理
- 布隆过滤器使用案例
布隆过滤器原理
布隆过滤器(Bloom Filter)是1970年由布隆提出的,它实际上是由一个很长的二进制向量和一系列随意映射函数组成。
它是一种基于概率的数据结构,主要用来判断某个元素是否在集合内,它具有运行速度快(时间效率),占用内存小的优点(空间效率),但是有一定的误识别率和删除困难的问题。它能够告诉你某个元素一定不在集合内或可能在集合内。
算法:
- 首先需要k个hash函数,每个函数可以把key散列成为1个整数
- 初始化时,需要一个长度为n比特的数组,每个比特位初始化为0
- 某个key加入集合时,用k个hash函数计算出k个散列值,并把数组中对应的比特位置为1
- 判断某个key是否在集合时,用k个hash函数计算出k个散列值,并查询数组中对应的比特位,如果所有的比特位都是1,认为在集合中。
优点:
不需要存储key,节省空间
缺点:
- 算法判断key在集合中时,有一定的概率key其实不在集合中
- 无法删除
名词解释
False Position
集合里没有某元素,查找结果是有该元素。
也就是误判,这种情况在布隆过滤器中可能会出现。False Negative
集合里有某元素,查找结果是没有该元素。
也就是少判,这种情况在布隆过滤器中一定不会出现
布隆过滤器只会多判不会少判。宁可错杀不可放过
想用布隆过滤器判断元素不存在,这个概率不是100%,
布隆过滤器认为不存在的情况, 确实是一定不会存在,但是,还有可能原本本身不存在,但是它会认为它存在的,比实际的要少
Bloom Filter不会动态增长,运行过程中维护的始终只是m位的bitset,所以空间复杂度只有O(m);
Bloom Filter的插入与属于操作主要都是在计算k个hash,所以都是O(k)。
个人感悟:
为什么布隆过滤器需要多个hash函数?
因为不同的key的hash值有可能会一样,这样误判的概率会很大,但是如果通过多个hash函数来计算,那么数据误判的概率就会低很多了
为什么布隆过滤器不可以删除?
如果布隆过滤器把其中一个key的值删除了,也就是把数组的值置为0了,另一个key的hash值有可能也被删除了。所以不可以删除
具体可以参考布隆过滤器,原理+案例+代码实现
测试案例:
手写java代码,测试布隆过滤器误判情况
int BF_CARDINAL_THRESHOLD = 100000; double BF_FALSE_POSITIVE_RATE = 0.1; BloomFilter<Integer> subOrderFilter = BloomFilter.create(Funnels.integerFunnel(), BF_CARDINAL_THRESHOLD, BF_FALSE_POSITIVE_RATE); HashSet st = new HashSet(); for(int i = 0 ; i <= BF_CARDINAL_THRESHOLD; i ++){ if(subOrderFilter.mightContain(i)){ st.add(i); //其实不存在,但是布隆过滤器认为它存在,这就是误判了 } else { subOrderFilter.put(i); } } System.out.println("一共误判了:" + st.size());
如果使用这种方式来过滤,就会导致有一部分数据其实没有出现过,但是也会被过滤掉
在flink程序中使用布隆过滤器实时过滤
这种情况下,会丢数据,其实感觉没啥意义。。。
package com.hzw.bigdata.flinkstudy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.hash.BloomFilter;
import org.apache.flink.shaded.guava18.com.google.common.hash.Funnels;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import java.nio.charset.StandardCharsets;
/**
* 使用布隆过滤器实时去重
*/
public class FlinkBloomFilterDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<String> stream1 = source.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String s, Collector<String> collector) throws Exception {
String[] arr = s.split(",");
for(String str : arr){
collector.collect(str);
}
}
})
.keyBy(x -> x)
.process(new SubOrderDeduplicateProcessFunc())
;
stream1.print();
env.execute();
}
// 去重用的ProcessFunction
public static final class SubOrderDeduplicateProcessFunc extends ProcessFunction<String, String> {
private static final long serialVersionUID = 1L;
private static final int BF_CARDINAL_THRESHOLD = 10;
private static final double BF_FALSE_POSITIVE_RATE = 0.1;
private volatile BloomFilter<String> subOrderFilter;
@Override
public void open(Configuration parameters) throws Exception {
subOrderFilter = BloomFilter.create(Funnels.stringFunnel(StandardCharsets.UTF_8), BF_CARDINAL_THRESHOLD, BF_FALSE_POSITIVE_RATE);
}
@Override
public void processElement(String s, Context context, Collector<String> collector) throws Exception {
String subOrderId = s;
if (!subOrderFilter.mightContain(subOrderId)) {
subOrderFilter.put(subOrderId);
collector.collect(s);
}
}
@Override
public void close() throws Exception {
subOrderFilter = null;
}
}
}
- 使用布隆过滤器防止缓存击穿
- *不在布隆过滤器中的元素一定不存在数据库中。**
利用布隆过滤器的这个特点可以解决缓存穿透的问题,在服务启动的时候先把数据的查询条件,例如数据的 ID 映射到布隆过滤器上,当然如果新增数据时,除了写入到数据库中之外,也需要将数据的ID存入到布隆过滤器中。
我们在查询某条数据时,先判断这个查询的 ID 是否存在布隆过滤器中,如果不存在就直接返回空值,而不需要继续查询数据库和缓存,存在布隆过滤器中才继续查询数据库和缓存,这样就解决缓存穿透的问题。