本文主要包括:
- Flink任务案例
- Flink的slot和并行度理解
Flink任务案例
Flink消费Kafka:自定义KafkaDeserializationSchema
Flink已经定义好的反序列化shema: - SimpleStringSchema:返回的结果只有Kafka的value,没有其它信息:
- TypeInformationKeyValueSerializationSchema:返回的结果只有Kafka的key,value,没有其它信息
如果需要获得Kafka的topic或者其它信息,就需要通过实现KafkaDeserializationSchema接口来自定义返回数据的结构
- 自定义KafkaDeserializationSchema:
如果不重写,会报如下错误:package com.hzw.bigdata.flinkstudy; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; import org.apache.kafka.clients.consumer.ConsumerRecord; import java.nio.charset.Charset; public class MyKafkaDeserializationSchema implements KafkaDeserializationSchema<String> { public static final Charset UTF_8 = Charset.forName("UTF-8"); @Override public boolean isEndOfStream(String nextElement) { return false; } @Override public String deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception { String value = new String(consumerRecord.value(), UTF_8.name()); long offset = consumerRecord.offset(); int partition = consumerRecord.partition(); System.out.println(String.format("%s,%s,%s",value,offset,partition)); return String.format("%s,%s,%s",value,offset,partition); } @Override public TypeInformation<String> getProducedType() { return TypeInformation.of(new TypeHint<String>(){}); //return null; //会报错 } }
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'Custom Source' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface. at org.apache.flink.api.dag.Transformation.getOutputType(Transformation.java:479) at org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:193) at org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:613) at com.hzw.bigdata.flinkstudy.FlinkConsumerKafka.main(FlinkConsumerKafka.java:60) Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'OUT' in 'interface org.apache.flink.streaming.api.functions.source.ParallelSourceFunction' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s). Otherwise the type has to be specified explicitly using type information. at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:923) at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:828) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:787) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2287) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1681) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1668) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1637) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1623) at com.hzw.bigdata.flinkstudy.FlinkConsumerKafka.main(FlinkConsumerKafka.java:58)
- 主类调用:
具体的可以参考package com.hzw.bigdata.flinkstudy; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; import org.apache.flink.util.Collector; import java.util.Properties; public class FlinkConsumerKafka { public static void main(String[] args) throws Exception { // 创建Flink执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // Kafka参数 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "golden-02:9092"); properties.setProperty("group.id", "fink-test"); properties.setProperty("auto.offset.reset","earliest"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); String inputTopic = "first"; // Source FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>(inputTopic, new MyKafkaDeserializationSchema(), properties); consumer.setStartFromEarliest(); DataStream<String> stream = env.addSource(consumer); DataStream<Object> wordCount = stream .flatMap((String line, Collector<Tuple2<String, Integer>> collector) -> { collector.collect(new Tuple2<String, Integer>(line, 1)); }) .returns(Types.TUPLE(Types.STRING, Types.INT)) .keyBy(key -> key.f0) .sum(1) .map(key -> (key.f0 + ":" + key.f1)); wordCount.print(); env.execute("kafka streaming word count"); } }
Flink实战:自定义KafkaDeserializationSchema(Java/Scala)
Flink的slot和并行度理解
Flink的Slot是什么?和Spark的Excutor有什么区别?
Slot和并行度之间有什么关系?
是每个slot都有几个并行度,还是说,并行度是整体任务的并行度,和每个slot没什么关系?