2723 字
14 分钟
HOLOGRES + FLINK 全托管实时数仓调测试记录
HOLOGRES + FLINK 全托管实时数仓调测试记录
Flink一侧
holo_pk_test
--********************************************************************--
-- Author: zwen
-- Created Time: 2024-08-19 15:55:35
-- Description: Write your description here
-- Hints: You can use SET statements to modify the configuration
--********************************************************************--
CREATE TABLE IF NOT EXISTS `holo`.`zhongtai`.`ods.sf_device` (
CONSTRAINT `PK_sf_device` PRIMARY KEY (`appid`,`uuid`,`channel`) NOT ENFORCED
)
with(
'enableTypeNormalization' = 'true',
'sink.delete-strategy'='IGNORE_DELETE'
) AS TABLE `rds`.`swoft`.`sf_device` /*+ OPTIONS('server-id'='7001-7020','rds.endpoint'='rds-vpc.cn-hangzhou.aliyuncs.com','rds.region-id'='cn-hangzhou','rds.access-key-id'='LTAI5t***QSL','rds.access-key-secret'='egV3Q8d***VXT','rds.db-instance-id'='rm-bp***90upe1','rds.download.timeout'='180s') */;
ods_rds_swoft
--********************************************************************--
-- Author: zwen
-- Created Time: 2024-08-07 18:44:18
-- Description: Write your description here
-- Hints: You can use SET statements to modify the configuration
--********************************************************************--
SET 'table.optimizer.source-merge.enabled' = 'true';
SET 'table.cdas.scan.newly-added-table.enabled' = 'true';
-- 加速增量阶段
SET 'scan.only.deserialize.captured.tables.changelog.enabled'='true';
SET 'scan.parallel-deserialize-changelog.enabled'='true';
BEGIN STATEMENT SET;
CREATE TABLE IF NOT EXISTS `holo`.`zhongtai`.`ods.sf_sys_app` with( 'enableTypeNormalization' = 'true') AS TABLE `rds`.`swoft`.`sf_sys_app` /*+ OPTIONS('server-id'='8001-8014','rds.endpoint'='rds-vpc.cn-hangzhou.aliyuncs.com','rds.region-id'='cn-hangzhou','rds.access-key-id'='LTAI5t***QSL','rds.access-key-secret'='egV3Q8d***VXT','rds.db-instance-id'='rm-bp***90upe1','rds.download.timeout'='180s') */;
CREATE TABLE IF NOT EXISTS `holo`.`zhongtai`.`ods.sf_sys_apartments` with( 'enableTypeNormalization' = 'true') AS TABLE `rds`.`swoft`.`sf_sys_apartments` /*+ OPTIONS('server-id'='8001-8014','rds.endpoint'='rds-vpc.cn-hangzhou.aliyuncs.com','rds.region-id'='cn-hangzhou','rds.access-key-id'='LTAI5t***QSL','rds.access-key-secret'='egV3Q8d***VXT','rds.db-instance-id'='rm-bp***90upe1','rds.download.timeout'='180s') */;
CREATE TABLE IF NOT EXISTS `holo`.`zhongtai`.`ods.sf_sys_project` with( 'enableTypeNormalization' = 'true') AS TABLE `rds`.`swoft`.`sf_sys_project` /*+ OPTIONS('server-id'='8001-8014','rds.endpoint'='rds-vpc.cn-hangzhou.aliyuncs.com','rds.region-id'='cn-hangzhou','rds.access-key-id'='LTAI5t***QSL','rds.access-key-secret'='egV3Q8d***VXT','rds.db-instance-id'='rm-bp***90upe1','rds.download.timeout'='180s') */;
-- CREATE TABLE IF NOT EXISTS `holo`.`zhongtai`.`ods.sf_device` with( 'enableTypeNormalization' = 'true') AS TABLE `rds`.`swoft`.`sf_device` /*+ OPTIONS('server-id'='8001-8014','rds.endpoint'='rds-vpc.cn-hangzhou.aliyuncs.com','rds.region-id'='cn-hangzhou','rds.access-key-id'='LTAI5t***QSL','rds.access-key-secret'='egV3Q8d***VXT','rds.db-instance-id'='rm-bp***90upe1','rds.download.timeout'='180s') */;
END;
ods_rds_swoft_p
--********************************************************************--
-- Author: zwen
-- Created Time: 2024-08-06 16:30:07
-- Description: Write your description here
-- Hints: You can use SET statements to modify the configuration
--********************************************************************--
SET 'table.optimizer.source-merge.enabled' = 'true';
SET 'table.cdas.scan.newly-added-table.enabled' = 'true';
CREATE TEMPORARY TABLE `sf_order_fmt` (
`dt` AS IFNULL(FROM_UNIXTIME(`time_add`,'yyyyMMdd'),'99991231') -- 实际上我们的time_add不会为null,但是from_unixtime认为可能返会null,即认为dt可能为null,dt在后面要作为分区,不能为null 这里手动转一下。
)with(
'scan.startup.mode'='earliest-offset'
) LIKE `rds`.`swoft`.`sf_order`;
BEGIN STATEMENT SET;
CREATE TABLE IF NOT EXISTS `holo`.`zhongtai`.`ods.sf_order_p`(
CONSTRAINT `pk_order_id_dt` PRIMARY KEY(id,dt) NOT ENFORCED
)
PARTITIONED BY(dt)
with(
'enableTypeNormalization' = 'true'
)
AS TABLE `sf_order_fmt` /*+ OPTIONS('server-id'='9001-9002') */;
END;
ods_rds_swoft_ttl
--********************************************************************--
-- Author: zwen
-- Created Time: 2024-08-16 13:46:24
-- Description: Write your description here
-- Hints: You can use SET statements to modify the configuration
--********************************************************************--
SET 'table.optimizer.source-merge.enabled' = 'true';
SET 'table.cdas.scan.newly-added-table.enabled' = 'true';
CREATE TABLE IF NOT EXISTS `holo`.`zhongtai`.`ods.sf_order` with( 'enableTypeNormalization' = 'true' ) AS TABLE `rds`.`swoft`.`sf_order` /*+ OPTIONS('server-id'='8030-8040','scan.startup.mode'='timestamp','scan.startup.timestamp-millis'='1724079600000','rds.endpoint'='rds-vpc.cn-hangzhou.aliyuncs.com','rds.region-id'='cn-hangzhou','rds.access-key-id'='LTAI5t***QSL','rds.access-key-secret'='egV3Q8d***VXT','rds.db-instance-id'='rm-bp***90upe1','rds.download.timeout'='180s') */;
ods_sls_zhongtai_swoft
--********************************************************************--
-- Author: zwen
-- Created Time: 2024-08-03 12:34:41
-- Description: Write your description here
-- Hints: You can use SET statements to modify the configuration
--********************************************************************--
insert into `holo`.`zhongtai`.`ods.sls_zhongtai_swoft`
(
request
,body_bytes_sent
,sign
,request_method
,http_user_agent
,request_time
,content_type
,c_timestamp
,remote_addr
,topic
,time_stamp
,request_uri
,request_body
,up_resp_time
,request_length
,http_referer
,appid
,http_x_forwarded_for
,upstream_http_content_type
,status
,receive_time
,dt
)
select
request
,body_bytes_sent
,sign
,request_method
,http_user_agent
,request_time
,content_type
,`timestamp`
,remote_addr
,__topic__
,time_stamp
,request_uri
,request_body
,up_resp_time
,request_length
,http_referer
,appid
,http_x_forwarded_for
,upstream_http_content_type
,status
,cast(__tag__['__receive_time__'] as bigint) as receive_time
,REGEXP_REPLACE(SUBSTR(`timestamp`,0,10),'-','') as dt
from
`sls`.zhongtai.zhongtai_swoft /*+OPTIONS('startTime'='2024-08-05 00:00:00')*/
where request_uri='/report/index/'
;
create_database_test
--********************************************************************--
-- Author: zwen
-- Created Time: 2024-08-08 15:28:18
-- Description: Write your description here
-- Hints: You can use SET statements to modify the configuration
--********************************************************************--
BEGIN STATEMENT SET;
-- CREATE DATABASE IF NOT EXISTS `holo`.`zhongtaix`
-- AS DATABASE `rds`.`swoft` INCLUDING TABLE 'sf_user|sf_device' /*+ OPTIONS('server-id'='7001-7003') */;
-- CREATE SCHEMA IF NOT EXISTS `holo`.`zhongtaix`.`ods`
-- AS DATABASE `rds`.`swoft` INCLUDING TABLE 'sf_user|sf_device' /*+ OPTIONS('server-id'='7001-7003') */;
END;
tmp_result_table
--********************************************************************--
-- Author: zwen
-- Created Time: 2024-08-09 13:40:00
-- Description: Write your description here
-- Hints: You can use SET statements to modify the configuration
--********************************************************************--
insert into `holo`.`zhongtai`.`tmp.tmp_result_table`
select od.dt
,od.appid
,max(app_name) app_name
,count(1) cnt
,sum(pay_fee) cash
from
`holo`.`zhongtai`.`ods.sf_order_p` od
left join
`holo`.`zhongtai`.`dim.dim_swoft_yunying_app_channel` FOR SYSTEM_TIME AS OF proctime() AS dim
-- (select appid,max(app_name) app_name,max(is_overseas) is_overseas from `holo`.`zhongtai`.`dim.dim_swoft_yunying_app_channel` group by appid) FOR SYSTEM_TIME AS OF proctime() AS dim
ON od.appid = dim.appid and od.origin=dim.channel
where ifnull(dim.is_overseas,0)=0
and od.is_pay=1
group by od.dt,od.appid
adb_cdc
--********************************************************************--
-- Author: zwen
-- Created Time: 2024-07-30 17:00:12
-- Description: Write your description here
-- Hints: You can use SET statements to modify the configuration
--********************************************************************--
-- adb cdc数据来源表
CREATE TEMPORARY TABLE test_adb_source (
`id` BIGINT,
`province` STRING,
`amount` DOUBLE,
`ts` TIMESTAMP,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'adb-mysql-cdc',
'hostname' = 'am-bp***320.ads.aliyuncs.com',
'username' = 'querydata',
'password' = 'Jv***c!0',
'database-name' = 'zhongtai',
'table-name' = 'flink_test'
);
-- 聚合sink表
-- CREATE TEMPORARY TABLE agg_table (
-- `province` STRING,
-- `amount_sum` DOUBLE,
-- PRIMARY KEY (`province`) NOT ENFORCED
-- )WITH (
-- 'connector' = 'print'
-- );
-- source表查询实例
-- select * from test_adb_source;
-- 简单聚合实例
-- INSERT INTO agg_table (province,amount_sum)
-- select province,sum(amount) amount_sum from test_adb_source group by province;
-- 写入holo
-- INSERT INTO holo.zt_hologres_test.test_sink_holo (province,amount_sum)
-- select province,sum(amount) amount_sum from test_adb_source group by province;
-- 测试延迟情况
INSERT INTO holo.zt_hologres_test.test_sink_holo_ts(id
,province
,amount
,ts
,ts_holo)
SELECT *,now() ts_holo FROM test_adb_source;
dwd_order
--********************************************************************--
-- Author: zwen
-- Created Time: 2024-08-05 16:54:39
-- Description: Write your description here
-- Hints: You can use SET statements to modify the configuration
--********************************************************************--
insert into `holo`.`zhongtai`.`dwd.swoft_order` /*+ OPTIONS('sink.delete-strategy'='IGNORE_DELETE') */
select
/*+
LOOKUP('jdbcReadBatchSize'='512','jdbcReadBatchQueueSize'='1024','cache'='None','table'='t2','async'='true','shuffle' = 'true' )
,LOOKUP('jdbcReadBatchSize'='2048','jdbcReadBatchQueueSize'='4096','cache'='None','table'='t3','async'='true','shuffle' = 'true' )
*/
t1.appid appid
,t1.uuid
,t1.ostr ostr
,ifnull(t2.app_name,concat(cast(t1.appid as string),'unkown-app_name')) app_name
,ifnull(t2.channel,concat(cast(t1.appid as string),'unkown-channel')) channel
,ifnull(cast(t2.is_overseas as string),concat(cast(t1.appid as string),'unkown-overseas')) is_overseas
,case when t3.appid is null then 1 when FROM_UNIXTIME(t3.time_add,'yyyy-MM-dd')=CURRENT_DATE then 1 else 0 end is_new
-- ,0 is_new
,t1.is_pay
,pay_fee cash_rmb
,us_receive_amount cash_usd
,t1.pay_time
,FROM_UNIXTIME(t1.create_time,'yyyyMMdd') create_dt
from
`holo`.`zhongtai`.`ods.sf_order` t1
left join
`holo`.`zhongtai`.`dim.dim_swoft_yunying_app_channel` FOR SYSTEM_TIME AS OF proctime() AS t2
ON t1.appid = t2.appid and t1.origin=t2.channel
left join
`holo`.`zhongtai`.`ods.sf_device` FOR SYSTEM_TIME AS OF proctime() AS t3
on t1.appid = t3.appid and t1.uuid = t3.uuid
;
dim_swoft_yunying_app_channel
--********************************************************************--
-- Author: zwen
-- Created Time: 2024-08-12 11:20:47
-- Description: Write your description here
-- Hints: You can use SET statements to modify the configuration
--********************************************************************--
INSERT INTO `holo`.`zhongtai`.`dim.dim_swoft_yunying_app_channel` /*+ OPTIONS('sink.delete-strategy'='IGNORE_DELETE') */
SELECT `ic`.`appid` AS `appid`
, `ic`.`channel` AS `channel`
, `sa`.`apartment_id` AS `apartment_id`
, `ap`.`title` AS `apartment_name`
, `sa`.`group_id` AS `group_id`
, `sg`.`title` AS `group_name`
, CASE
WHEN `sa`.`project_id` = 0 THEN '0'
ELSE `sa`.`project_id`
END AS `project_id`
, COALESCE(`sp`.`title`, '-') AS `project_name`
, `sa`.`system` AS `system`
, `dd`.`label` AS `system_name`
, `sa`.`title` AS `app_name`
, `sa`.`is_overseas` AS `is_overseas`
, COALESCE(`tt`.`type_name`, '-') AS `type_name`
, COALESCE(`w`.`region`, '-') AS `region`
, COALESCE(`w`.`invest_income_type`, '-') AS `invest_income_type`
, COALESCE(`admin`.`realname`, '-') AS `admin_name`
, COALESCE(`admin`.`id`, 0) AS `admin_id`
, `ic`.`way_id` AS `invest_way`
, COALESCE(`w`.`way_name`, '-') AS `invest_way_name`
, COALESCE(`w`.`platform_id`, 0) AS `platform_id`
, COALESCE(`platform`.`platform_name`, '-') AS `platform_name`
, COALESCE(`w`.`two_platform_id`, 0) AS `two_platform_id`
, COALESCE(`two`.`two_platform_name`, '-') AS `two_platform_name`
, COALESCE(`td`.`type_desc`, '-') AS `type_desc`
, COALESCE(`p`.`title`, '-') AS `agent_name`
, COALESCE(`p`.`note`, '-') AS `channel_remarks`
, COALESCE(`department`.`id`, '0') AS `business_group_id`
, COALESCE(`department`.`name`, '-') AS `business_group_name`
, COALESCE(`ic`.`time_add`, '0') AS `channel_timeadd`
, COALESCE(`ic`.`is_master`, '0') AS `is_master`
, COALESCE(`ic`.`invest_target`, '0') AS `invest_target`
, COALESCE(`ic`.`channel_type`, '0') AS `channel_type`
, COALESCE(`ic`.`channel_sort`, '0') AS `channel_sort`
, COALESCE(`w`.`invest_way_sort`, '0') AS `invest_way_sort`
FROM `holo`.`zhongtai`.`ods.dim_invest_channel_all` /*+ OPTIONS('upsertSource'='true') */ `ic`
LEFT JOIN `holo`.`zhongtai`.`ods.sf_sys_app` /*+ OPTIONS('upsertSource'='true') */ `sa` ON `ic`.`appid` = `sa`.`id`
LEFT JOIN `holo`.`zhongtai`.`ods.sf_sys_apartments` /*+ OPTIONS('upsertSource'='true') */ `ap` ON `sa`.`apartment_id` = `ap`.`id`
LEFT JOIN `holo`.`zhongtai`.`ods.sf_sys_project` /*+ OPTIONS('upsertSource'='true') */ `sp` ON `sa`.`project_id` = `sp`.`id`
LEFT JOIN `holo`.`zhongtai`.`ods.sf_sys_groups` /*+ OPTIONS('upsertSource'='true') */ `sg` ON `sa`.`group_id` = `sg`.`id`
LEFT JOIN `holo`.`zhongtai`.`ods.sf_ip_landing_page` /*+ OPTIONS('upsertSource'='true') */ `p`
ON `ic`.`appid` = `p`.`appid`
AND `ic`.`channel` = `p`.`origin`
LEFT JOIN `holo`.`zhongtai`.`ods.sf_sys_admin` /*+ OPTIONS('upsertSource'='true') */ `admin` ON `p`.`admin_id` = `admin`.`id`
LEFT JOIN `holo`.`zhongtai`.`ods.dim_invest_type_way` /*+ OPTIONS('upsertSource'='true') */ `w` ON `ic`.`way_id` = `w`.`way_id`
LEFT JOIN `holo`.`zhongtai`.`ods.dim_invest_type_platform` /*+ OPTIONS('upsertSource'='true') */ `platform` ON `w`.`platform_id` = `platform`.`id`
LEFT JOIN `holo`.`zhongtai`.`ods.dim_invest_type_platform_two` /*+ OPTIONS('upsertSource'='true') */ `two` ON `w`.`two_platform_id` = `two`.`id`
LEFT JOIN `holo`.`zhongtai`.`ods.dim_invest_type_desc` /*+ OPTIONS('upsertSource'='true') */ `td` ON `w`.`desc_id` = `td`.`id`
LEFT JOIN `holo`.`zhongtai`.`ods.dim_invest_type` /*+ OPTIONS('upsertSource'='true') */ `tt` ON `w`.`type_id` = `tt`.`id`
LEFT JOIN `holo`.`zhongtai`.`ods.zt_admin_member_department_relation` /*+ OPTIONS('upsertSource'='true') */ `relation` ON `admin`.`id` = `relation`.`member_id`
LEFT JOIN `holo`.`zhongtai`.`ods.zt_admin_member_department` /*+ OPTIONS('upsertSource'='true') */ `department` ON `relation`.`department_id` = `department`.`id`
LEFT JOIN `holo`.`zhongtai`.`ods.sys_dict_data` /*+ OPTIONS('upsertSource'='true') */ `dd`
ON `sa`.`system` = `dd`.`val`
AND `dd`.`type` = 'system_no'
--********************************************************************--
-- Author: zwen
-- Created Time: 2024-08-12 11:20:47
-- Description: Write your description here
-- Hints: You can use SET statements to modify the configuration
--********************************************************************--
INSERT INTO `holo`.`zhongtai`.`dim.dim_swoft_yunying_app_channel_simpleify` /*+ OPTIONS('sink.delete-strategy'='IGNORE_DELETE') */
SELECT `ic`.`appid` AS `appid`
, `ic`.`channel` AS `channel`
, `sa`.`system` AS `system`
, `dd`.`label` AS `system_name`
, `sa`.`title` AS `app_name`
, `sa`.`is_overseas` AS `is_overseas`
FROM `holo`.`zhongtai`.`ods.dim_invest_channel_all` /*+ OPTIONS('upsertSource'='true') */ `ic`
LEFT JOIN `holo`.`zhongtai`.`ods.sf_sys_app` /*+ OPTIONS('upsertSource'='true') */ `sa` ON `ic`.`appid` = `sa`.`id`
LEFT JOIN `holo`.`zhongtai`.`ods.sys_dict_data` /*+ OPTIONS('upsertSource'='true') */ `dd`
ON `sa`.`system` = `dd`.`val`
AND `dd`.`type` = 'system_no'
查询脚本
A_catalog_create
--***************************************************************************--
-- 风险警告: 对 DDL/DML 语句运行将会直接影响线上元数据/数据,请谨慎操作。
--***************************************************************************--
DROP CATALOG holox;
CREATE CATALOG holox
WITH (
'type' = 'hologres',
'property-version'='1', -- 通用参数从8.0.6支持,建议设置为1
'endpoint' = 'hgprec***gzhou-vpc-st.hologres.aliyuncs.com:80',
'username' = 'BASIC$flink_holo',
'password' = '***',
'dbname' = 'zhongtai',
'binlog' = 'true',
'sdkMode' = 'jdbc', -- 推荐使用jdbc模式。在最新版本中默认是jdbc_fixed模式;jdbc_fixed 支持过滤非法字符
'cdcmode' = 'true',
'connectionpoolname' = 'the_conn_pool',
'sink.delete-strategy' = 'IGNORE_DELETE', -- 宽表merge需要开启,防止回撤 8.0.8才开始支持。
'ignoredelete' = 'true', -- 宽表merge需要开启,防止回撤。
'partial-insert.enabled' = 'true', -- 宽表merge需要开启此参数,实现部分列更新。
'mutateType' = 'insertOrUpdate', -- 宽表merge需要开启此参数,实现部分列更新。
'remove-u0000-in-text.enabled'='true', -- 过滤非法字符,防止报错
'table_property.binlog.level' = 'replica', --也可以在创建catalog时传入持久化的hologres表属性,之后创建表时,默认都开启binlog。
'table_property.binlog.ttl' = '2592000',
'table_property.orientation'='row,column',
'ignore-non-persisted-options'='false' -- 是否忽略不可持久化参数 可持久化意味着当再次从Catalog读取该表的相关信息时,可以重新获取您在DDL中定义的一致的信息。目前仅支持endpoint、username、password和dbname可持久化选项
);
-- DROP CATALOG rds;
CREATE CATALOG `rds` WITH(
'type' = 'mysql',
'hostname' = 'rm-bp***90upe1.mysql.rds.aliyuncs.com',
'port' = '3306',
'username' = 'swoft_read',
'password' = '***',
'default-database' = 'swoft',
'catalog.table.metadata-columns' = 'op_ts;table_name;database_name',
'property-version'='1',
'catalog.table.treat-tinyint1-as-boolean'='false' -- 避免tinyint1被识别成布尔值
);
Q-query_table_info
--***************************************************************************--
-- 风险警告: 对 DDL/DML 语句运行将会直接影响线上元数据/数据,请谨慎操作。
--***************************************************************************--
show create table `holo`.`zhongtai`.`tmp.tmp_result_table_no_dim`;
alter操作
--***************************************************************************--
-- 风险警告: 对 DDL/DML 语句运行将会直接影响线上元数据/数据,请谨慎操作。
--***************************************************************************--
ALTER TABLE `holo`.`zhongtai`.`tmp.tmp_result_table` SET('ignoredelete'='false');
ALTER TABLE `holo`.`zhongtai`.`tmp.tmp_result_table` SET('sink.delete-strategy'='IGNORE_DELETE');
UDF_TEST
--***************************************************************************--
-- 风险警告: 对 DDL/DML 语句运行将会直接影响线上元数据/数据,请谨慎操作。
--***************************************************************************--
-- UDF
select udf_test_substr('24adf3423',3,5);
-- UDAF
select udaf_test_sum
from
(
select 1 as x union all
select 6 as x union all
select 3 as x
)t;
-- UDTF
CREATE TEMPORARY TABLE ASI_UDTF_Sink (
name VARCHAR,
place VARCHAR
) WITH (
'connector' = 'blackhole'
);
INSERT INTO ASI_UDTF_Sink
SELECT name,place
FROM (
select 'aaa|dddd' message union all
select 'xxx|1111' message
),lateral table(udtf_test_split(message)) as T(name,place)
SQL_PLAYGROUND
--***************************************************************************--
-- 风险警告: 对 DDL/DML 语句运行将会直接影响线上元数据/数据,请谨慎操作。
--***************************************************************************--
select FROM_UNIXTIME(1573031988,'yyyyMMdd');
SELECT DATE_FORMAT('2024-08-03T14:14:14+08:00','yyyyMMdd');
select REGEXP_REPLACE(SUBSTR('2024-08-03T14:14:14+08:00',0,10),'-','');
select CURRENT_DATE;
SELECT window_start, window_end, item, SUM(price) AS total_price
FROM
(SELECT item, price, CAST(window_start as varchar) as window_start, window_end FROM
TABLE(
SESSION(TABLE `holo`.`zhongtai`.`ods.sf_order`, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)))
GROUP BY item, window_start, window_end;
select TYPEOF(FROM_UNIXTIME(1573031988,'yyyyMMdd'));
SET 'table.local-time-zone'='Asia/Shanghai';
SET 'table.local-time-zone'='UTC+8';
select TO_TIMESTAMP_LTZ(1724396892,0);
test_create_catalog_rds2
--***************************************************************************--
-- 风险警告: 对 DDL/DML 语句运行将会直接影响线上元数据/数据,请谨慎操作。
--***************************************************************************--
CREATE CATALOG `rds_2` WITH(
'type' = 'mysql',
'hostname' = 'rm-bp***90upe1.mysql.rds.aliyuncs.com',
'port' = '3306',
'username' = 'swoft_read',
'password' = 'u***b',
'default-database' = 'swoft',
'catalog.table.metadata-columns' = 'op_ts;table_name;database_name',
'property-version'='1',
'catalog.table.treat-tinyint1-as-boolean'='false', -- 避免tinyint1被识别成布尔值
'rds.endpoint'='rds-vpc.cn-hangzhou.aliyuncs.com',
'rds.region-id'='cn-hangzhou',
'rds.access-key-id'='LTAI5t***QSL',
'rds.access-key-secret'='egV3Q8d***VXT',
'rds.db-instance-id'='rm-bp***90upe1',
'rds.download.timeout'='180s'
);
SET 'execution.runtime-mode' = 'streaming';
EXPLAIN CHANGELOG_MODE
insert into holox.zhongtai.`ods.sf_order`
/*+ OPTIONS('ignore-non-persisted-options'='true') */
select * from rds.swoft.sf_order;
资源管理
HOLO一侧
删表
drop table ods.sf_sys_app;
drop table ods.sf_sys_apartments;
drop table ods.sf_sys_project;
drop table ods.sf_sys_groups;
drop table ods.sf_ip_landing_page;
drop table ods.sf_sys_admin;
drop table ods.zt_admin_member_department_relation;
drop table ods.zt_admin_member_department;
drop table ods.sys_dict_data;
drop table ods.dim_invest_channel_all;
drop table ods.dim_invest_type_way;
drop table ods.dim_invest_type_platform;
drop table ods.dim_invest_type_platform_two;
drop table ods.dim_invest_type_desc;
drop table ods.dim_invest_type;
drop table ods.sf_device;
drop table ods.sf_user;
查询表分区信息
SELECT
nsp_name AS schema_name,
tbl_name AS table_name,
ENABLE,
time_unit,
time_zone,
num_precreate,
num_retention,
b.usename AS create_user,
cret_time,
schd_start_time,
options
FROM
hologres.hg_partitioning_config AS a
LEFT JOIN pg_user AS b ON a.cret_user = b.usesysid;
调整表分区生命周期
ALTER TABLE ods.sf_order_p SET (
auto_partitioning_enable = 'true',
auto_partitioning_time_zone = 'Asia/Shanghai',
auto_partitioning_time_unit = 'DAY',
auto_partitioning_num_precreate = '3',
auto_partitioning_num_retention = '2'
);
BEGIN;
CALL SET_TABLE_PROPERTY('ods.sf_order', 'time_to_live_in_seconds', '2592000');
COMMIT ;
查询binlog信息
SELECT hg_binlog_lsn,hg_binlog_event_type,hg_binlog_timestamp_us,* FROM ods.sf_device;
SELECT
*
FROM
hologres.hg_table_properties
WHERE
property_key = 'binlog.level'
AND property_value = 'replica'
AND table_namespace='dim';
建表
SELECT HG_DUMP_SCRIPT('ods.sf_order_p'); -- 获取建表语句
BEGIN;
DROP TABLE IF EXISTS dim.dim_swoft_yunying_app_channel;
CREATE TABLE IF NOT EXISTS dim.dim_swoft_yunying_app_channel(
appid BIGINT,
channel TEXT,
apartment_id BIGINT,
apartment_name TEXT,
group_id BIGINT,
group_name TEXT,
project_id TEXT,
project_name TEXT,
system BIGINT,
system_name TEXT,
app_name TEXT,
is_overseas BIGINT,
type_name TEXT,
region TEXT,
invest_income_type TEXT,
admin_name TEXT,
admin_id BIGINT,
invest_way BIGINT,
invest_way_name TEXT,
platform_id BIGINT,
platform_name TEXT,
two_platform_id BIGINT,
two_platform_name TEXT,
type_desc TEXT,
agent_name TEXT,
channel_remarks TEXT,
business_group_id TEXT,
business_group_name TEXT,
channel_timeadd TEXT,
is_master TEXT,
invest_target TEXT,
channel_type TEXT,
channel_sort TEXT,
invest_way_sort TEXT,
PRIMARY KEY(appid,channel)
);
CALL SET_TABLE_PROPERTY('dim.dim_swoft_yunying_app_channel', 'orientation', 'row,column');
CALL SET_TABLE_PROPERTY('dim.dim_swoft_yunying_app_channel', 'binlog.level', 'replica');
CALL SET_TABLE_PROPERTY('dim.dim_swoft_yunying_app_channel', 'binlog.ttl', '2592000');
COMMIT;
BEGIN ;
CREATE TABLE IF NOT EXISTS tmp.tmp_result_table (
dt TEXT,
appid BIGINT,
app_name TEXT,
cnt BIGINT ,
cash FLOAT8 ,
PRIMARY KEY(dt,appid)
);
CALL SET_TABLE_PROPERTY('tmp.tmp_result_table', 'orientation', 'row,column');
CALL SET_TABLE_PROPERTY('tmp.tmp_result_table', 'binlog.level', 'replica');
CALL SET_TABLE_PROPERTY('tmp.tmp_result_table', 'binlog.ttl', '2592000');
COMMIT ;
BEGIN ;
CREATE TABLE IF NOT EXISTS tmp.tmp_wzw_test (
tag TEXT,
num BIGINT,
PRIMARY KEY(tag)
);
CALL SET_TABLE_PROPERTY('tmp.tmp_wzw_test', 'orientation', 'row,column');
CALL SET_TABLE_PROPERTY('tmp.tmp_wzw_test', 'binlog.level', 'replica');
CALL SET_TABLE_PROPERTY('tmp.tmp_wzw_test', 'binlog.ttl', '2592000');
COMMIT ;
BEGIN ;
CREATE TABLE IF NOT EXISTS tmp.tmp_result_table_no_p (
appid BIGINT,
cnt BIGINT ,
PRIMARY KEY(appid)
);
CALL SET_TABLE_PROPERTY('tmp.tmp_result_table_no_p', 'orientation', 'row,column');
CALL SET_TABLE_PROPERTY('tmp.tmp_result_table_no_p', 'binlog.level', 'replica');
CALL SET_TABLE_PROPERTY('tmp.tmp_result_table_no_p', 'binlog.ttl', '2592000');
COMMIT ;
BEGIN;
DROP TABLE IF EXISTS dim.dim_swoft_yunying_app_channel_simpleify;
CREATE TABLE IF NOT EXISTS dim.dim_swoft_yunying_app_channel_simpleify (
appid BIGINT,
channel TEXT,
system BIGINT,
system_name TEXT,
app_name TEXT,
is_overseas BIGINT,
PRIMARY KEY(appid,channel)
);
CALL SET_TABLE_PROPERTY('dim.dim_swoft_yunying_app_channel_simpleify', 'orientation', 'row,column');
CALL SET_TABLE_PROPERTY('dim.dim_swoft_yunying_app_channel_simpleify', 'binlog.level', 'replica');
CALL SET_TABLE_PROPERTY('dim.dim_swoft_yunying_app_channel_simpleify', 'binlog.ttl', '2592000');
COMMIT;
BEGIN;
DROP TABLE IF EXISTS dwd.swoft_order;
CREATE TABLE IF NOT EXISTS dwd.swoft_order (
appid BIGINT,
uuid TEXT,
ostr TEXT,
app_name TEXT,
channel TEXT,
is_overseas TEXT,
is_new BIGINT,
is_pay BIGINT,
cash_rmb FLOAT8,
cash_usd FLOAT8,
pay_time BIGINT,
create_dt TEXT,
PRIMARY KEY(appid,uuid,channel)
);
CALL SET_TABLE_PROPERTY('dwd.swoft_order', 'orientation', 'row,column');
CALL SET_TABLE_PROPERTY('dwd.swoft_order', 'binlog.level', 'replica');
CALL SET_TABLE_PROPERTY('dwd.swoft_order', 'binlog.ttl', '2592000');
COMMIT;
BEGIN;
DROP TABLE tmp.sf_order_api;
CREATE TABLE tmp.sf_order_api (
appid bigint NOT NULL,
ostr text NOT NULL,
pay_fee double precision,
is_pay bigint,
create_dt text
,PRIMARY KEY (appid, ostr)
)with (
orientation = 'row,column'
);
COMMIT;
创建scheme
create schema ods;
create schema dwd;
create schema dws;
create schema ads;
create schema tmp;
获取建表语句
SELECT HG_DUMP_SCRIPT('tmp.sf_order_api'); -- 获取建表语句
SELECT HG_DUMP_SCRIPT('ods.sf_device'); -- 获取建表语句
查询表占用空间大小
SELECT hologres.pg_relation_size('ods.sf_device','binlog');--返回单位是Byte
-- SELECT hologres.hg_relation_size('dim.dim_swoft_yunying_app_channel','[data|binlog|mv|all]') ;
SELECT hologres.hg_relation_size('ods.sf_order_p','binlog') binlog_p,hologres.hg_relation_size('ods.sf_order_p_20240814','binlog') binlog_s;
SELECT hologres.hg_relation_size('ods.sf_order_p_20240814','data') tdata,hologres.hg_relation_size('ods.sf_order_p_20240814','binlog') binlog;
SELECT hologres.hg_relation_size('ods.sf_device','data') tdata,hologres.hg_relation_size('ods.sf_device','binlog') binlog;
SELECT hologres.hg_relation_size('ods.sf_order','all') ;
SELECT pg_size_pretty(hologres.hg_relation_size('ods.sf_order','all'));
SELECT pg_size_pretty(hologres.hg_relation_size('ods.sf_order_p','all'));
TRUNCATE table ods.sf_device;
select TO_CHAR(TO_TIMESTAMP(create_time), 'YYYYMMDD'), count(1) from ods.sf_order group by TO_CHAR(TO_TIMESTAMP(create_time), 'YYYYMMDD');
DELETE FROM ods.sf_order WHERE TO_CHAR(TO_TIMESTAMP(create_time), 'YYYYMMDD') != '20240827';
dim 建表 dim.dim_swoft_yunying_app_channel
BEGIN;
CREATE TABLE IF NOT EXISTS dim.dim_swoft_yunying_app_channel (
appid BIGINT,
channel TEXT,
apartment_id BIGINT,
apartment_name TEXT,
group_id BIGINT,
group_name TEXT,
project_id TEXT,
project_name TEXT,
system BIGINT,
system_name TEXT,
app_name TEXT,
is_overseas BIGINT,
type_name TEXT,
region TEXT,
invest_income_type TEXT,
admin_name TEXT,
admin_id BIGINT,
invest_way BIGINT,
invest_way_name TEXT,
platform_id BIGINT,
platform_name TEXT,
two_platform_id BIGINT,
two_platform_name TEXT,
type_desc TEXT,
agent_name TEXT,
channel_remarks TEXT,
business_group_id TEXT,
business_group_name TEXT,
channel_timeadd TEXT,
is_master TEXT,
invest_target TEXT,
channel_type TEXT,
channel_sort TEXT,
invest_way_sort TEXT,
PRIMARY KEy(appid,channel)
);
CALL SET_TABLE_PROPERTY('ods.sf_device', 'orientation', 'row,column');
CALL SET_TABLE_PROPERTY('ods.sf_device', 'binlog.level', 'replica');
CALL SET_TABLE_PROPERTY('ods.sf_device', 'binlog.ttl', '2592000');
COMMIT;
HOLOGRES + FLINK 全托管实时数仓调测试记录
https://www.zwenooo.link/posts/published/bigdata/flink-holo-test/flink-holo-test/