当前位置:
首页
文章
后端
详情

Apache Flink——侧输出流(side output)

前言

flink处理数据流时,经常会遇到这样的情况:处理一个数据源时,往往需要将该源中的不同类型的数据做分割(分流)处理,假如使用 filter算子对数据源进行筛选分割的话,势必会造成数据流的多次复制,造成不必要的性能浪费;

flink中的侧输出,就是将数据流进行分割,而不对流进行复制的一种分流机制。flink的侧输出的另一个作用就是对延时迟到的数据进行处理,这样就可以不必丢弃迟到的数据;

简单理解就是,根据业务上的一定规则,将一个源中的数据拆分成不同的流,即主流和侧输出流。

侧输出流(side output)

大部分的DataStream API的算子的输出是单一输出,也就是某种数据类型的流。除了split算子,可以将一条流分成多条流,这些流的数据类型也都相同。process function的side outputs功能可以产生多条流,并且这些流的数据类型可以不一样。一个side output可以定义为OutputTag[X]对象,X是输出流的数据类型。process function可以通过Context对象发射一个事件到一个或者多个side outputs。

除了从DataStream操作输出主结果流外,也可以生成任一数量的额外的侧输出流。结果流可以和主输出流的类型可以不匹配,并且侧输出流可以有不同类型。侧输出流的操作当你分流时非常有用,之前你需要先复制一个流再过滤出来,有了侧输出流,就不需要这样操作。

具体应用时,只要在处理函数的.processElement()或者.onTimer()方法中,调用上下文
的.output()方法就可以了。

DataStream<Integer> stream = env.addSource(...);

SingleOutputStreamOperator<Long> longStream stream.process(new ProcessFunction<Integer, Long>() {

    @Override
    public void processElement( Integer value, Context ctx, Collector<Integer> out) throws Exception {
        // 转换成 Long,输出到主流中
        out.collect(Long.valueOf(value));
        // 转换成 String,输出到侧输出流中
        ctx.output(outputTag, "side-output: " + String.valueOf(value));
    }

});

当使用侧输出流时,首先需要定义一个OutputTag,它将要被用来确定一个侧输出流。

OutputTag<String> outputTag = new OutputTag<String>("side-output") {};

注意:侧输出流的类型是根据侧输出流包括元素的类型来确定。

如果想要获取这个侧输出流,可以基于处理之后的 DataStream 直接调用.getSideOutput() 方法,传入对应的OutputTag,这个方式与窗口API 中获取侧输出流是完全一样的。

DataStream<String> stringStream = longStream.getSideOutput(outputTag);

可以从以下方法中来把数据输出到侧输出流

  • ProcessFunction
  • KeyedProcessFunction
  • CoProcessFunction
  • KeyedCoProcessFunction
  • ProcessWindowFunction
  • ProcessAllWindowFunction

在以上的函数中可以用参数Context来暴露给用户发送数据到侧输出流。下面例子是用ProcessFunction来发送数据到侧输出流。

import com.yibo.flink.datastream.Event;
import com.yibo.flink.sourcecustom.ClickSource;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import scala.Tuple3;

import java.time.Duration;

/**
 * @Author: huangyibo
 * @Date: 2022/7/19 0:34
 * @Description:
 */

public class SideOutputStreamTest {

    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //设置生成水位线的时间间隔
        env.getConfig().setAutoWatermarkInterval(100);

        //乱序流的Watermark生成
        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                // 插入水位线的逻辑 设置 watermark 延迟时间,2 秒
                .assignTimestampsAndWatermarks(
                        // 针对乱序流插入水位线,延迟时间设置为 2s
                        WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                                // 抽取时间戳的逻辑
                                .withTimestampAssigner((SerializableTimestampAssigner<Event>) (element, recordTimestamp) -> element.getTimestamp())
                );

        OutputTag<Tuple3<String, String, Long>> maryTag = new OutputTag<Tuple3<String, String, Long>>("Mary"){};
        OutputTag<Tuple3<String, String, Long>> boboTag = new OutputTag<Tuple3<String, String, Long>>("Bobo"){};

        SingleOutputStreamOperator<Event> processStream = stream.process(new ProcessFunction<Event, Event>() {
            @Override
            public void processElement(Event event, Context context, Collector<Event> out) throws Exception {
                if ("Mary".equals(event.getUser())) {
                    context.output(maryTag, Tuple3.apply(event.getUser(), event.getUrl(), event.getTimestamp()));
                } else if ("Bobo".equals(event.getUser())) {
                    context.output(boboTag, Tuple3.apply(event.getUser(), event.getUrl(), event.getTimestamp()));
                } else {
                    out.collect(event);
                }
            }
        });

        processStream.print("else");
        processStream.getSideOutput(maryTag).print("Mary");
        processStream.getSideOutput(boboTag).print("Bobo");

        env.execute();
    }
}
import com.yibo.flink.datastream.Event;
import com.yibo.flink.sourcecustom.ClickSource;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.time.Duration;

/**
 * @Author: huangyibo
 * @Date: 2022/7/7 0:00
 * @Description: 测试迟到数据
 */

public class LateDataTest {

    public static void main(String[] args) throws Exception {
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //设置生成水位线的时间间隔
        env.getConfig().setAutoWatermarkInterval(100);

        //乱序流的Watermark生成
        SingleOutputStreamOperator<Event> streamOperator = env.addSource(new ClickSource())
                // 插入水位线的逻辑 设置 watermark 延迟时间,2 秒
                .assignTimestampsAndWatermarks(
                        // 针对乱序流插入水位线,延迟时间设置为 2s
                        WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                                // 抽取时间戳的逻辑
                                .withTimestampAssigner((SerializableTimestampAssigner<Event>) (element, recordTimestamp) -> element.getTimestamp())
                );

        //定义一个输出标签
        OutputTag<Event> lateTag = new OutputTag<Event>("late"){};

        //统计每个url的访问量
        SingleOutputStreamOperator<UrlViewCount> result = streamOperator.keyBy(Event::getUrl)
                //滚动事件时间窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                //允许窗口处理迟到数据 允许1分钟的延迟
                .allowedLateness(Time.minutes(1))
                //将最后的迟到数据输出到侧输出流
                .sideOutputLateData(lateTag)
                .aggregate(new UrlViewCountAgg(), new UrlViewCountResult());

        result.print("result");
        result.getSideOutput(lateTag).print("late");

        env.execute();
    }


    /**
     * 自定义实现AggregateFunction, 增量计算url页面的访问量,来一条数据就 +1
     */
    public static class UrlViewCountAgg implements AggregateFunction<Event, Long, Long> {

        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(Event value, Long accumulator) {
            return accumulator + 1;
        }

        @Override
        public Long getResult(Long accumulator) {
            return accumulator;
        }

        @Override
        public Long merge(Long a, Long b) {
            return null;
        }
    }


    /**
     * 自定义实现ProcessWindowFunction, 包装窗口信息输出
     */
    public static class UrlViewCountResult extends ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow> {

        @Override
        public void process(String url, Context context, Iterable<Long> iterable, Collector<UrlViewCount> out) throws Exception {
            Long urlCount = iterable.iterator().next();
            //集合窗口信息输出
            long start = context.window().getStart();
            long end = context.window().getEnd();
            UrlViewCount urlCountView = new UrlViewCount();
            urlCountView.setUrl(url);
            urlCountView.setCount(urlCount);
            urlCountView.setWindowStart(start);
            urlCountView.setWindowEnd(end);
            out.collect(urlCountView);
        }
    }
}

Flink侧输出流有两个作用

  • 1、分隔过滤:充当filter算子功能,将源中的不同类型的数据做分割处理。因为使用filter 算子对数据源进行筛选分割的话,会造成数据流的多次复制,导致不必要的性能浪费,过滤后不需要的数据可以重新写入Pulsar或Kafka的topic中供其他应用消费处理。

  • 2、延时数据处理:在做对延时迟窗口计算时,对延时迟到的数据进行处理,即时数据迟到也不会造成丢失。

参考:
https://blog.csdn.net/rustwei/article/details/121102439

免责申明:本站发布的内容(图片、视频和文字)以转载和分享为主,文章观点不代表本站立场,如涉及侵权请联系站长邮箱:xbc-online@qq.com进行反馈,一经查实,将立刻删除涉嫌侵权内容。

同类热门文章

深入了解C++中的new操作符:使用具体实例学习

C++中的new操作符是动态分配内存的主要手段之一。在程序运行时,我们可能需要动态地创建和销毁对象,而new就是为此提供了便利。但是,使用new也常常会引发一些问题,如内存泄漏、空指针等等。因此,本文将通过具体的示例,深入介绍C++中的new操作符,帮助读者更好地掌握其使用。


深入了解C++中的new操作符:使用具体实例学习

怎么用Java反射获取包下所有类? 详细代码实例操作

Java的反射机制就是在运行状态下,对于任何一个类,它能知道这个类的所有属性和方法;对于任何一个对象,都能调用这个对象的任意一个方法。本篇文章将通过具体的代码示例,展示如何通过Java反射来获取包下的所有类。


怎么用Java反射获取包下所有类? 详细代码实例操作

员工线上学习考试系统

有点播,直播,在线支付,三级分销等功能,可以对学员学习情况的监督监控,有源码,可二次开发。支持外网和局域网私有化部署,经过测试源码完整可用!1、视频点播:视频播放,图文资料,课件下载,章节试学,限时免

员工线上学习考试系统

了解Java中的volati关键字的作用 以及具体使用方法

本篇文章将和大家分享一下Java当中的volatile关键字,下面将为各位小伙伴讲述volatile关键字的作用以及它的具体使用方法。


了解Java中的volati关键字的作用 以及具体使用方法

Java Map 所有的值转为String类型

可以使用 Java 8 中的 Map.replaceAll() 方法将所有的值转为 String 类型: 上面的代码会将 map 中所有的值都转为 String 类型。 HashMap 是 Java

Java Map 所有的值转为String类型