0%

BloomFilter

本文主要包括:

  • 布隆过滤器原理
  • 布隆过滤器使用案例

布隆过滤器原理

布隆过滤器(Bloom Filter)是1970年由布隆提出的,它实际上是由一个很长的二进制向量和一系列随意映射函数组成。
它是一种基于概率的数据结构,主要用来判断某个元素是否在集合内,它具有运行速度快(时间效率),占用内存小的优点(空间效率),但是有一定的误识别率和删除困难的问题。它能够告诉你某个元素一定不在集合内或可能在集合内。

算法:

  1. 首先需要k个hash函数,每个函数可以把key散列成为1个整数
  2. 初始化时,需要一个长度为n比特的数组,每个比特位初始化为0
  3. 某个key加入集合时,用k个hash函数计算出k个散列值,并把数组中对应的比特位置为1
  4. 判断某个key是否在集合时,用k个hash函数计算出k个散列值,并查询数组中对应的比特位,如果所有的比特位都是1,认为在集合中。

优点:

不需要存储key,节省空间

缺点:

  1. 算法判断key在集合中时,有一定的概率key其实不在集合中
  2. 无法删除

名词解释

  • False Position
    集合里没有某元素,查找结果是有该元素。
    也就是误判,这种情况在布隆过滤器中可能会出现。

  • False Negative
    集合里有某元素,查找结果是没有该元素。
    也就是少判,这种情况在布隆过滤器中一定不会出现

布隆过滤器只会多判不会少判。宁可错杀不可放过
想用布隆过滤器判断元素不存在,这个概率不是100%,
布隆过滤器认为不存在的情况, 确实是一定不会存在,但是,还有可能原本本身不存在,但是它会认为它存在的,比实际的要少

Bloom Filter不会动态增长,运行过程中维护的始终只是m位的bitset,所以空间复杂度只有O(m);
Bloom Filter的插入与属于操作主要都是在计算k个hash,所以都是O(k)。

个人感悟:

为什么布隆过滤器需要多个hash函数?

因为不同的key的hash值有可能会一样,这样误判的概率会很大,但是如果通过多个hash函数来计算,那么数据误判的概率就会低很多了

为什么布隆过滤器不可以删除?

如果布隆过滤器把其中一个key的值删除了,也就是把数组的值置为0了,另一个key的hash值有可能也被删除了。所以不可以删除

具体可以参考布隆过滤器,原理+案例+代码实现

测试案例:

  • 手写java代码,测试布隆过滤器误判情况

    int BF_CARDINAL_THRESHOLD = 100000;
            double BF_FALSE_POSITIVE_RATE = 0.1;
            BloomFilter<Integer> subOrderFilter = BloomFilter.create(Funnels.integerFunnel(), BF_CARDINAL_THRESHOLD, BF_FALSE_POSITIVE_RATE);
            
            HashSet st = new HashSet();
            for(int i = 0 ; i <= BF_CARDINAL_THRESHOLD; i ++){
                if(subOrderFilter.mightContain(i)){
                    st.add(i);    //其实不存在,但是布隆过滤器认为它存在,这就是误判了
                } else {
                    subOrderFilter.put(i);
                }
            }
            System.out.println("一共误判了:" + st.size());

    如果使用这种方式来过滤,就会导致有一部分数据其实没有出现过,但是也会被过滤掉

  • 在flink程序中使用布隆过滤器实时过滤

    这种情况下,会丢数据,其实感觉没啥意义。。。

package com.hzw.bigdata.flinkstudy;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.hash.BloomFilter;
import org.apache.flink.shaded.guava18.com.google.common.hash.Funnels;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

import java.nio.charset.StandardCharsets;

/**
 * 使用布隆过滤器实时去重
 */
public class FlinkBloomFilterDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> source = env.socketTextStream("localhost", 9999);

        SingleOutputStreamOperator<String> stream1 = source.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String s, Collector<String> collector) throws Exception {
                String[] arr = s.split(",");
                for(String str : arr){
                    collector.collect(str);
                }
            }
        })
                .keyBy(x -> x)
                .process(new SubOrderDeduplicateProcessFunc())
                ;
        stream1.print();
        env.execute();
    }
    // 去重用的ProcessFunction
    public static final class SubOrderDeduplicateProcessFunc extends ProcessFunction<String, String> {
        private static final long serialVersionUID = 1L;
        private static final int BF_CARDINAL_THRESHOLD = 10;
        private static final double BF_FALSE_POSITIVE_RATE = 0.1;

        private volatile BloomFilter<String> subOrderFilter;

        @Override
        public void open(Configuration parameters) throws Exception {
            subOrderFilter = BloomFilter.create(Funnels.stringFunnel(StandardCharsets.UTF_8), BF_CARDINAL_THRESHOLD, BF_FALSE_POSITIVE_RATE);
        }


        @Override
        public void processElement(String s, Context context, Collector<String> collector) throws Exception {
            String subOrderId = s;
            if (!subOrderFilter.mightContain(subOrderId)) {
                subOrderFilter.put(subOrderId);
                collector.collect(s);
            }
        }


        @Override
        public void close() throws Exception {
            subOrderFilter = null;
        }
    }
}
  • 使用布隆过滤器防止缓存击穿
  • *不在布隆过滤器中的元素一定不存在数据库中。**
    利用布隆过滤器的这个特点可以解决缓存穿透的问题,在服务启动的时候先把数据的查询条件,例如数据的 ID 映射到布隆过滤器上,当然如果新增数据时,除了写入到数据库中之外,也需要将数据的ID存入到布隆过滤器中。
    我们在查询某条数据时,先判断这个查询的 ID 是否存在布隆过滤器中,如果不存在就直接返回空值,而不需要继续查询数据库和缓存,存在布隆过滤器中才继续查询数据库和缓存,这样就解决缓存穿透的问题。