create table order_source_ms(id BIGINT,deal_amt DOUBLE,shop_id STRING,customer_id String,city_id bigint,product_count double,
order_at timestamp(3),last_updated_at timestamp(3),pay_at timestamp,refund_at timestamp,
tenant_id STRING,order_category STRING,
h as hour(last_updated_at),
pay_hour as hour(pay_at),
refund_hour as hour(refund_at),
m as MINUTE(last_updated_at),
dt as to_DATE(cast(last_updated_at as string)),
pay_dt as to_DATE(cast(pay_at as string)),
refund_dt as to_DATE(cast(refund_at as string)),
PRIMARY KEY(id) NOT ENFORCED)
with(
'connector' ='mysql-cdc',
'hostname' ='ip',
'port'='3306',
'username' = 'username',
'password' = 'password',
'database-name'='databasename',
'scan.startup.mode'='latest-offset',
'debezium.skipped.operations'='d',
'table-name'='tablename')
h as hour(last_updated_at),
pay_hour as hour(pay_at),
refund_hour as hour(refund_at),
m as MINUTE(last_updated_at),
dt as to_DATE(cast(last_updated_at as string)),
pay_dt as to_DATE(cast(pay_at as string)),
refund_dt as to_DATE(cast(refund_at as string)),
flinksql的内置函数
参数解读:
????????对于一般的参数可以通过官网查看:
OptionRequiredDefaultTypeDescriptionconnectorrequired(none)StringSpecify what connector to use, here should be?'mysql-cdc'.hostnamerequired(none)StringIP address or hostname of the MySQL database server.usernamerequired(none)StringName of the MySQL database to use when connecting to the MySQL database server.passwordrequired(none)StringPassword to use when connecting to the MySQL database server.database-namerequired(none)StringDatabase name of the MySQL server to monitor. The database-name also supports regular expressions to monitor multiple tables matches the regular expression.table-namerequired(none)StringTable name of the MySQL database to monitor. The table-name also supports regular expressions to monitor multiple tables matches the regular expression.portoptional3306IntegerInteger port number of the MySQL database server.server-idoptional(none)IntegerA numeric ID or a numeric ID range of this database client, The numeric ID syntax is like '5400', the numeric ID range syntax is like '5400-5408', The numeric ID range syntax is recommended when 'scan.incremental.snapshot.enabled' enabled. Every ID must be unique across all currently-running database processes in the MySQL cluster. This connector joins the MySQL cluster as another server (with this unique ID) so it can read the binlog. By default, a random number is generated between 5400 and 6400, though we recommend setting an explicit value.scan.incremental.snapshot.enabledoptionaltrueBooleanIncremental snapshot is a new mechanism to read snapshot of a table. Compared to the old snapshot mechanism, the incremental snapshot has many advantages, including: (1) source can be parallel during snapshot reading, (2) source can perform checkpoints in the chunk granularity during snapshot reading, (3) source doesn't need to acquire global read lock (FLUSH TABLES WITH READ LOCK) before snapshot reading. If you would like the source run in parallel, each parallel reader should have an unique server id, so the 'server-id' must be a range like '5400-6400', and the range must be larger than the parallelism. Please see?Incremental Snapshot Readingsection for more detailed information.scan.incremental.snapshot.chunk.sizeoptional8096IntegerThe chunk size (number of rows) of table snapshot, captured tables are split into multiple chunks when read the snapshot of table.scan.snapshot.fetch.sizeoptional1024IntegerThe maximum fetch size for per poll when read table snapshot.scan.startup.modeoptionalinitialStringOptional startup mode for MySQL CDC consumer, valid enumerations are "initial" and "latest-offset". Please see?Startup Reading Positionsection for more detailed information.server-time-zoneoptionalUTCStringThe session time zone in database server, e.g. "Asia/Shanghai". It controls how the TIMESTAMP type in MYSQL converted to STRING. See more?here.debezium.min.row. count.to.stream.resultoptional1000IntegerDuring a snapshot operation, the connector will query each included table to produce a read event for all rows in that table. This parameter determines whether the MySQL connection will pull all results for a table into memory (which is fast but requires large amounts of memory), or whether the results will instead be streamed (can be slower, but will work for very large tables). The value specifies the minimum number of rows a table must contain before the connector will stream results, and defaults to 1,000. Set this parameter to '0' to skip all table size checks and always stream all results during a snapshot.connect.timeoutoptional30sDurationThe maximum time that the connector should wait after trying to connect to the MySQL database server before timing out.debezium.*optional(none)StringPass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from MySQL server. For example:?'debezium.snapshot.mode' = 'never'. See more about the?Debezium's MySQL Connector properties