网站链接: element-ui dtcms
当前位置: 首页 > 技术博文  > 技术博文

Flink实践:FlinkSQL中的join

2021/6/24 23:44:27 人评论

1.以简单的FlinkSQL demo为例,进行Join的测试: object FlinkJoinDemo {def main(args: Array[String]): Unit {val env StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(10)env.setStreamTimeCharacteristic(TimeCharacterist…

1.以简单的FlinkSQL demo为例,进行Join的测试:

object FlinkJoinDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(10)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val settings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()

    val tableEnv = StreamTableEnvironment.create(env, settings)
    tableEnv.executeSql(
      """
        |CREATE TABLE input (
        |    id BIGINT,
        |    name STRING,
        |    proctime AS PROCTIME()   -- generates processing-time attribute using computed column
        |) WITH (
        |    'connector' = 'kafka',  -- using kafka connector
        |    'topic' = 'flinksource',  -- kafka topic
        |    'scan.startup.mode' = 'latest-offset',  -- reading from the beginning
        |    'properties.bootstrap.servers' = 'hadoop1:9092,hadoop2:9092,hadoop3:9092',  -- kafka broker address
        |    'format' = 'json'  -- the data format is json
        |)
        |
      """.stripMargin)


    tableEnv.executeSql(
      """
        |CREATE TABLE input2 (
        |    id BIGINT,
        |    age Int,
        |    proctime AS PROCTIME()   -- generates processing-time attribute using computed column
        |) WITH (
        |    'connector' = 'kafka',  -- using kafka connector
        |    'topic' = 'flinksource2',  -- kafka topic
        |    'scan.startup.mode' = 'latest-offset',  -- reading from the beginning
        |    'properties.bootstrap.servers' = 'hadoop1:9092,hadoop2:9092,hadoop3:9092',  -- kafka broker address
        |    'format' = 'json'  -- the data format is json
        |)
        |
      """.stripMargin)



    tableEnv.executeSql(
      """
        |CREATE TABLE output (
        |    id BIGINT,
        |    name STRING,
        |    age int
        |) WITH (
        |    'connector' = 'print'
        |)
        |
      """.stripMargin)

    tableEnv.executeSql(
      """
        |
        | insert into output select a.id,a.name,b.age from input a right join input2 b on a.id = b.id
        |
      """.stripMargin)
  }
}

1.1.left join

         使用left join,以a表为主表,b为维度表,当a表的数据到来,会立即输出,如果b表中的数据到来,会到a表中找相对应的主键,并关联进行数据的输出。

A:{"id":2,"name":"张三"}
O: {"id":2,"name":"张三",age:null}

B:{"id":2,"age":200}
O: -D{"id":2,"name":"张三",age:null}
O: +I{"id":2,"name":"张三",age:200}

B:{"id":3,"age":100}
O:无输出

A:{"id":3,"name":"李四"}
O:{"id":3,"name":"李四",age:100}

1.2.inner join

        使用inner join。2张表的数据都会实时监听并关联输出。即join上的数据会进行输出

1.3.right join

        真好与left join相对。以b为主表,a为维度表。

1.4.interval join

1.5.Lateral

相关资讯

    暂无相关的数据...

共有条评论 网友评论

验证码: 看不清楚?