0%

Flink学习笔记(二)

本文主要包括:

  • Flink任务案例
  • Flink的slot和并行度理解

    Flink任务案例

    Flink消费Kafka:自定义KafkaDeserializationSchema

    Flink已经定义好的反序列化shema:
  • SimpleStringSchema:返回的结果只有Kafka的value,没有其它信息:
  • TypeInformationKeyValueSerializationSchema:返回的结果只有Kafka的key,value,没有其它信息

如果需要获得Kafka的topic或者其它信息,就需要通过实现KafkaDeserializationSchema接口来自定义返回数据的结构

  • 自定义KafkaDeserializationSchema:
    package com.hzw.bigdata.flinkstudy;
    
    import org.apache.flink.api.common.typeinfo.TypeHint;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    
    import java.nio.charset.Charset;
    
    public class MyKafkaDeserializationSchema implements KafkaDeserializationSchema<String> {
        public static final Charset UTF_8 = Charset.forName("UTF-8");
        @Override
        public boolean isEndOfStream(String nextElement) {
            return false;
        }
    
        @Override
        public String deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {
            String value = new String(consumerRecord.value(), UTF_8.name());
            long offset = consumerRecord.offset();
            int partition = consumerRecord.partition();
            System.out.println(String.format("%s,%s,%s",value,offset,partition));
            return String.format("%s,%s,%s",value,offset,partition);
        }
    
        @Override
        public TypeInformation<String> getProducedType() {
            return TypeInformation.of(new TypeHint<String>(){});
            //return null; //会报错
        }
    }
    
    如果不重写,会报如下错误:
    script
    Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'Custom Source' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface. at org.apache.flink.api.dag.Transformation.getOutputType(Transformation.java:479) at org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:193) at org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:613) at com.hzw.bigdata.flinkstudy.FlinkConsumerKafka.main(FlinkConsumerKafka.java:60) Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'OUT' in 'interface org.apache.flink.streaming.api.functions.source.ParallelSourceFunction' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s). Otherwise the type has to be specified explicitly using type information. at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:923) at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:828) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:787) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2287) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1681) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1668) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1637) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1623) at com.hzw.bigdata.flinkstudy.FlinkConsumerKafka.main(FlinkConsumerKafka.java:58)
  • 主类调用:
    package com.hzw.bigdata.flinkstudy;
    
    
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
    import org.apache.flink.util.Collector;
    
    import java.util.Properties;
    
    
    public class FlinkConsumerKafka {
        public static void main(String[] args) throws Exception {
            // 创建Flink执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            // Kafka参数
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "golden-02:9092");
            properties.setProperty("group.id", "fink-test");
            properties.setProperty("auto.offset.reset","earliest");
            properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            String inputTopic = "first";
            // Source
            FlinkKafkaConsumer<String> consumer =
                    new FlinkKafkaConsumer<String>(inputTopic, new MyKafkaDeserializationSchema(), properties);
            consumer.setStartFromEarliest();
            DataStream<String> stream = env.addSource(consumer);
            DataStream<Object> wordCount = stream
                    .flatMap((String line, Collector<Tuple2<String, Integer>> collector) -> {
                        collector.collect(new Tuple2<String, Integer>(line, 1));
                    })
                    .returns(Types.TUPLE(Types.STRING, Types.INT))
                    .keyBy(key -> key.f0)
                    .sum(1)
                    .map(key -> (key.f0 + ":" + key.f1));
            wordCount.print();
            env.execute("kafka streaming word count");
        }
    }
    具体的可以参考
    Flink实战:自定义KafkaDeserializationSchema(Java/Scala)

Flink的slot和并行度理解

Flink的Slot是什么?和Spark的Excutor有什么区别?

Slot和并行度之间有什么关系?

是每个slot都有几个并行度,还是说,并行度是整体任务的并行度,和每个slot没什么关系?

假设kafka有10个分区,flink设置的并行度只有1,会启动几个线程来读这10个分区的数据