Flink SQL CREATE 语法

主要引用官方文档 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/create/#watermark

CREATE 语句用于注册表、视图或函数。注册后的表、视图和函数可以在 SQL 查询中使用。目前 Flink SQL 支持下列 CREATE 语句:

  • CREATE TABLE
  • CREATE DATABASE
  • CREATE VIEW
  • CREATE FUNCTION

执行 CREATE

可以使用 TableEnvironment 的 executeSql() 方法执行 CREATE 语句。若 CREATE 操作执行成功,executeSql() 方法返回 ‘OK’,否则会抛出异常。

EnvironmentSettings settings = EnvironmentSettings.newInstance()...
TableEnvironment tableEnv = TableEnvironment.create(settings);

// 对已注册的表进行 SQL 查询
// 注册名为 “Orders” 的表
tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
// 在表上执行 SQL 查询,并把得到的结果作为一个新的表
Table result = tableEnv.sqlQuery(
  "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");

// 对已注册的表进行 INSERT 操作
// 注册 TableSink
tableEnv.executeSql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)");
// 在表上执行 INSERT 语句并向 TableSink 发出结果
tableEnv.executeSql(
  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");

SQL CLI

Flink SQL> CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...);
[INFO] Table has been created.

Flink SQL> CREATE TABLE RubberOrders (product STRING, amount INT) WITH (...);
[INFO] Table has been created.

Flink SQL> INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%';
[INFO] Submitting SQL update statement to the cluster...

CREATE TABLE

以下语法概述了 CREATE TABLE 语法:

CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
  (
    { <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
    [ <watermark_definition> ]
    [ <table_constraint> ][ , ...n]
  )
  [COMMENT table_comment]
  [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
  WITH (key1=val1, key2=val2, ...)
  [ LIKE source_table [( <like_options> )] ]

<physical_column_definition>:
  column_name column_type [ <column_constraint> ] [COMMENT column_comment]

<column_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED

<table_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED

<metadata_column_definition>:
  column_name column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]

<computed_column_definition>:
  column_name AS computed_column_expression [COMMENT column_comment]

<watermark_definition>:
  WATERMARK FOR rowtime_column_name AS watermark_strategy_expression

<source_table>:
  [catalog_name.][db_name.]table_name

<like_options>:
{
   { INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS }
 | { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS } 
}[, ...]

Columns

Physical / Regular Columns

Pyhsical Columns 数据库中已知的常规字段,定义了字段的名称、类型和顺序。Connectors 和 Formats 使用这些列(按定义的顺序)来进行配置。

CREATE TABLE MyTable (
  `user_id` BIGINT,
  `name` STRING
) WITH (
  ...
);

Metadata Columns

Metadata Column 是 SQL 标准的扩展(可选项),允许访问连接器、格式化表中每一行数据,由 metadata 关键字表示。

例如,可以使用 Metadata column 从 Kafka 记录中读取和写入时间戳,以进行基于时间的操作。

根据 Connector 和 Format 选择可使用的 Metadata column 。

下面定义了一个表,声明字段 record_time 使用 Metadata column timestamp:

CREATE TABLE MyTable (
  `user_id` BIGINT,
  `name` STRING,
  `record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'    -- reads and writes a Kafka record's timestamp
) WITH (
  'connector' = 'kafka'
  ...
);

record_time 成为表结构的一部分,可以像常规列一样进行转换和存储:

INSERT INTO MyTable SELECT user_id, name, record_time + INTERVAL '1' SECOND FROM MyTable;

如果列名使用 Metadata column 的标识键(kafka 中的 timesamp),则可以简略写法:

CREATE TABLE MyTable (
  `user_id` BIGINT,
  `name` STRING,
  `timestamp` TIMESTAMP_LTZ(3) METADATA    -- use column name as metadata key
) WITH (
  'connector' = 'kafka'
  ...
);

如果列的数据类型与 Metadata column 的数据类型不同,则运行时将执行显式强制转换(这要求这两种数据类型是兼容的)。

CREATE TABLE MyTable (
  `user_id` BIGINT,
  `name` STRING,
  `timestamp` BIGINT METADATA    -- cast the timestamp as BIGINT
) WITH (
  'connector' = 'kafka'
  ...
);

默认情况下,Planner 假定 Metadata column 可以用于读写。但是,在许多情况下,外部系统提供的只读多于可写。因此,可以使用 VIRTUAL 关键字从持久化中排除元数据列(下面例子中的 offset)。

CREATE TABLE MyTable (
  `timestamp` BIGINT METADATA,       -- part of the query-to-sink schema
  `offset` BIGINT METADATA VIRTUAL,  -- not part of the query-to-sink schema
  `user_id` BIGINT,
  `name` STRING,
) WITH (
  'connector' = 'kafka'
  ...
);

Computed Columns

计算列是一个使用 column_name AS computed_column_expression 语法生成的虚拟列。由使用同一表中其他列的表达式生成,并且不会在表中进行物理存储。这个表达式可以包含物理列、常量、函数或变量的任意组合,但这个表达式不能存在任何子查询。

下面的例子是使用 cost AS price * quantity 定义的一个计算列

CREATE TABLE MyTable (
  `user_id` BIGINT,
  `price` DOUBLE,
  `quantity` DOUBLE,
  `cost` AS price * quanitity,  -- evaluate expression and supply the result to queries
) WITH (
  'connector' = 'kafka'
  ...
);

定义在 source table 上的计算列会在从数据源读取数据后被计算,可以在 SELECT 查询语句中使用。与使用 VIRTUAL 的 Metadata column 类似,计算列不会持久化。因此计算列不能作为 INSERT INTO 语句的目标(在 INSERT 语句中,SELECT 语句的 schema 需要与目标表不带有计算列的 schema 一致)。

计算列可用于为 CREATE TABLE 语句定义时间属性。Processing time 可以简单地通过使用了系统函数 PROCTIME() 的 proc AS PROCTIME() 语句进行定义。 Event time 可能需要从现有的字段中获得(例如,原始字段的类型不是 TIMESTAMP 或嵌套在 JSON 字符串中)。

WATERMARK

WATERMARK 定义了表的事件时间属性,其形式为 WATERMARK FOR rowtime_column_name AS watermark_strategy_expression。

rowtime_column_name 把一个现有的列定义为事件时间的属性。该列的类型必须为 TIMESTAMP(3),且是 schema 中的顶层列(top-level column),也可以是一个计算列。

watermark_strategy_expression 定义了 watermark 的生成策略。允许使用包括计算列在内的任意非子查询表达式来计算 watermark;表达式的返回类型必须是 TIMESTAMP(3)。仅当返回的 watermark 不为空且大于之前发出的 watermark 时才会被发出(以保证 watermark 递增)。

Flink 为每条记录的计算 watermark,定期(pipeline.auto-watermark-interval)发出所生成的最大的 watermark(如果 watermark 为空或不大于之前的 watermark 不发出)。若 watermark 的间隔(pipeline.auto-watermark-interval)是 0ms,那么每条记录都会产生一个 watermark(根据前述的规则发出)。

使用事件时间(Event time)语义时,表必须包含事件时间属性和 watermark 策略。

Flink 提供了几种常用的 watermark 策略:

  • 严格递增时间戳(Strictly ascending timestamps):WATERMARK FOR rowtime_column AS rowtime_column

    • 发出到目前为止已观察到的最大时间戳的 watermark,时间戳大于最大时间戳的 Row 被认为没有迟到
  • 递增时间戳(Ascending timestamps): WATERMARK FOR rowtime_column AS rowtime_column – INTERVAL ‘0.001’ SECOND

    • 发出到目前为止已观察到的最大时间戳减 1 的 watermark,时间戳大于或等于最大时间戳的 Row 被认为没有迟到
  • 有界乱序时间戳(Bounded out of orderness timestamps):WATERMARK FOR rowtime_column AS rowtime_column – INTERVAL ‘string’ timeUnit

    • 发出到目前为止已观察到的最大时间戳减去指定延迟的 watermark,例如,WATERMARK FOR rowtime_column AS rowtime_column – INTERVAL ‘5’ SECOND 是一个 5 秒延迟的 watermark 策略
CREATE TABLE Orders (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ( . . . );

PRIMARY KEY

主键约束是 Flink 优化的一种提示信息,表明一张表或视图的某个(些)列是唯一的并且不包含 Null 值。主键声明的列都是非空的,可以被用作表中每行的唯一标识。

主键可以和列的定义一起声明,也可以独立声明为表的限制属性,不管是哪种方式,主键都不可以重复定义,否则 Flink 会报错。

有效性检查

SQL 标准主键限制可以有两种模式:ENFORCED 或者 NOT ENFORCED。 申明了是否输入/输出数据做检查(是否唯一)。Flink 只支持 NOT ENFORCED 模式,用户需要自己保证唯一性。

Flink 假定声明了主键的列都是不包含 Null 值的,Connector 在处理数据时需要自己保证语义正确。

在 CREATE TABLE 语句中,创建主键会修改列的 nullable 属性,主键声明的列默认都是非 Nullable 的。

PARTITIONED BY

根据指定的列对已经创建的表进行分区。若表使用 filesystem sink ,则将会为每个分区创建一个目录。

WITH Options

Table properties 用于创建 table source/sink,一般用于寻找和创建底层的连接器(Connector)。

表达式 key1=val1 的键和值必须为字符串字面量。不同 Connector 有各自的 properties。

表名可以为以下三种格式 1. catalog_name.db_name.table_name 2. db_name.table_name 3. table_name。

使用 catalog_name.db_name.table_name 的表将会与名为 catalog_name 的 catalog 和名为 db_name 的数据库一起注册到 metastore 中;使用 db_name.table_name 的表将会被注册到当前执行的 table environment 中的 catalog 且数据库会被命名为 db_name;

对于 table_name,数据表将会被注册到当前正在运行的 catalog 和数据库中。

使用 CREATE TABLE 语句注册的表均可用作 table source 和 table sink。 在被 DML 语句引用前,无法决定其实际用于 source 或是 sink。

LIKE

LIKE 子句可以基于现有表的定义去创建新表,并且可以扩展或排除原始表中的某些部分。LIKE 子句必须在 CREATE 语句中定义,并且是基于 CREATE 语句的更上层定义。LIKE 子句可以用于定义表的多个部分,而不仅仅是 schema 部分(可以重用或改写指定的连接器配置属性或者添加 watermark 定义)。

示例如下:

CREATE TABLE Orders (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3)
) WITH ( 
    'connector' = 'kafka',
    'scan.startup.mode' = 'earliest-offset'
);

CREATE TABLE Orders_with_watermark (
    -- 添加 watermark 定义
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (
    -- 改写 startup-mode 属性
    'scan.startup.mode' = 'latest-offset'
)
LIKE Orders;

Orders_with_watermark 表等效于使用以下语句创建的表:

CREATE TABLE Orders_with_watermark (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (
    'connector' = 'kafka',
    'scan.startup.mode' = 'latest-offset'
);

Merge Table

表属性的合并逻辑可以用 like options 来控制。可以控制合并的表属性如下:

  • CONSTRAINTS – 主键和唯一键约束
  • GENERATED – 计算列
  • OPTIONS – 连接器信息、格式化方式等配置项
  • PARTITIONS – 表分区信息
  • WATERMARKS – watermark 定义

并且有三种不同的表属性合并策略:

  • INCLUDING – 新表包含源表(source table)所有的表属性,如果和源表的表属性重复则会直接失败,如新表和源表存在相同 key 的属性。
  • EXCLUDING – 新表不包含源表指定的任何表属性。
  • OVERWRITING – 新表包含源表的表属性,但如果出现重复项,则会用新表的表属性覆盖源表中的重复表属性,如新表和源表存在相同 key 的属性,则会使用当前语句中定义的 key 的属性值。

可以使用 INCLUDING/EXCLUDING ALL 这种声明方式来指定使用怎样的合并策略,例如使用 EXCLUDING ALL INCLUDING WATERMARKS,代表只有源表的 WATERMARKS 属性才会被包含进新表。示例如下:

-- 存储在文件系统的源表
CREATE TABLE Orders_in_file (
    `user` BIGINT,
    product STRING,
    order_time_string STRING,
    order_time AS to_timestamp(order_time)
)
PARTITIONED BY (`user`) 
WITH ( 
    'connector' = 'filesystem',
    'path' = '...'
);

-- 对应存储在 kafka 的源表
CREATE TABLE Orders_in_kafka (
    -- 添加 watermark 定义
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (
    'connector' = 'kafka',
    ...
)
LIKE Orders_in_file (
    -- 排除需要生成 watermark 的计算列之外的所有内容。
    -- 去除不适用于 kafka 的所有分区和文件系统的相关属性。
    EXCLUDING ALL
    INCLUDING GENERATED
);

默认将使用 INCLUDING ALL OVERWRITING OPTIONS 的合并策略。

无法选择 physical columns 的合并策略,会按照 INCLUDING 策略合并。

CREATE CATALOG

CREATE CATALOG catalog_name
  WITH (key1=val1, key2=val2, ...)

Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。

更多参考官方文档

CREATE DATABASE

CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name
  [COMMENT database_comment]
  WITH (key1=val1, key2=val2, ...)

根据给定的属性创建数据库。若数据库中已存在同名表会抛出异常。

IF NOT EXISTS

若数据库已经存在,则不会进行任何操作。

WITH OPTIONS

数据库属性一般用于存储关于这个数据库额外的信息。 表达式 key1=val1 中的键和值都需要是字符串字面量。

CREATE VIEW

CREATE [TEMPORARY] VIEW [IF NOT EXISTS] [catalog_name.][db_name.]view_name
  [( columnName [, columnName ]* )] [COMMENT view_comment]
  AS query_expression

根据给定的 query 语句创建一个视图。若数据库中已经存在同名视图会抛出异常。

TEMPORARY

创建一个有 catalog 和数据库命名空间的临时视图,并覆盖原有的视图。

IF NOT EXISTS

若该视图已经存在,则不会进行任何操作。

CREATE FUNCTION

CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION 
  [IF NOT EXISTS] [catalog_name.][db_name.]function_name 
  AS identifier [LANGUAGE JAVA|SCALA|PYTHON]

创建 function,可以指定 catalog 和 database,若 catalog 中,已经有同名的函数注册了,则无法注册。

LANGUAGE JAVA|SCALA|PYTHON 用于指定 Flink runtime 如何执行这个函数。目前,只支持 JAVA, SCALA 和 PYTHON,且函数的默认语言为 JAVA。

如果是 JAVA 或者 SCALA,则 identifier 是 UDF 实现类的全限定名(更多参考:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/udfs/)。

如果是 PYTHON,则 identifier 是 UDF 对象的全限定名(更多参考:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/table/udfs/python_udfs/)。 如果是 PYTHON,而当前程序是 Java/Scala 程序或者 SQL 程序,则需要配置 Python 相关的依赖(更多参考:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/dependency_management/#python-dependency-in-javascala-program)。

TEMPORARY

创建一个临时 catalog function,有 catalog 和 database,并覆盖原有的 function 。

TEMPORARY SYSTEM

创建一个临时 system function,有 catalog,没有 database,并覆盖系统内置的 function。

IF NOT EXISTS

若该函数已经存在,则不会进行任何操作。

Tags:

Add a Comment

电子邮件地址不会被公开。 必填项已用*标注