2723 字
14 分钟
HOLOGRES + FLINK 全托管实时数仓调测试记录

HOLOGRES + FLINK 全托管实时数仓调测试记录#

Flink一侧#

holo_pk_test

--********************************************************************--
-- Author:         zwen
-- Created Time:   2024-08-19 15:55:35
-- Description:    Write your description here
-- Hints:          You can use SET statements to modify the configuration
--********************************************************************--

CREATE TABLE IF NOT EXISTS `holo`.`zhongtai`.`ods.sf_device` (
    CONSTRAINT `PK_sf_device` PRIMARY KEY (`appid`,`uuid`,`channel`) NOT ENFORCED
)
with(  
    'enableTypeNormalization' = 'true',
    'sink.delete-strategy'='IGNORE_DELETE'
) AS TABLE `rds`.`swoft`.`sf_device` /*+ OPTIONS('server-id'='7001-7020','rds.endpoint'='rds-vpc.cn-hangzhou.aliyuncs.com','rds.region-id'='cn-hangzhou','rds.access-key-id'='LTAI5t***QSL','rds.access-key-secret'='egV3Q8d***VXT','rds.db-instance-id'='rm-bp***90upe1','rds.download.timeout'='180s') */;

ods_rds_swoft

--********************************************************************--
-- Author:         zwen
-- Created Time:   2024-08-07 18:44:18
-- Description:    Write your description here
-- Hints:          You can use SET statements to modify the configuration
--********************************************************************--

SET 'table.optimizer.source-merge.enabled' = 'true';
SET 'table.cdas.scan.newly-added-table.enabled' = 'true'; 
-- 加速增量阶段
SET 'scan.only.deserialize.captured.tables.changelog.enabled'='true';
SET 'scan.parallel-deserialize-changelog.enabled'='true';

BEGIN STATEMENT SET;

CREATE TABLE IF NOT EXISTS `holo`.`zhongtai`.`ods.sf_sys_app` with(  'enableTypeNormalization' = 'true') AS TABLE `rds`.`swoft`.`sf_sys_app` /*+ OPTIONS('server-id'='8001-8014','rds.endpoint'='rds-vpc.cn-hangzhou.aliyuncs.com','rds.region-id'='cn-hangzhou','rds.access-key-id'='LTAI5t***QSL','rds.access-key-secret'='egV3Q8d***VXT','rds.db-instance-id'='rm-bp***90upe1','rds.download.timeout'='180s') */;
CREATE TABLE IF NOT EXISTS `holo`.`zhongtai`.`ods.sf_sys_apartments` with(  'enableTypeNormalization' = 'true') AS TABLE `rds`.`swoft`.`sf_sys_apartments` /*+ OPTIONS('server-id'='8001-8014','rds.endpoint'='rds-vpc.cn-hangzhou.aliyuncs.com','rds.region-id'='cn-hangzhou','rds.access-key-id'='LTAI5t***QSL','rds.access-key-secret'='egV3Q8d***VXT','rds.db-instance-id'='rm-bp***90upe1','rds.download.timeout'='180s') */;
CREATE TABLE IF NOT EXISTS `holo`.`zhongtai`.`ods.sf_sys_project` with(  'enableTypeNormalization' = 'true') AS TABLE `rds`.`swoft`.`sf_sys_project` /*+ OPTIONS('server-id'='8001-8014','rds.endpoint'='rds-vpc.cn-hangzhou.aliyuncs.com','rds.region-id'='cn-hangzhou','rds.access-key-id'='LTAI5t***QSL','rds.access-key-secret'='egV3Q8d***VXT','rds.db-instance-id'='rm-bp***90upe1','rds.download.timeout'='180s') */;
-- CREATE TABLE IF NOT EXISTS `holo`.`zhongtai`.`ods.sf_device` with(  'enableTypeNormalization' = 'true') AS TABLE `rds`.`swoft`.`sf_device` /*+ OPTIONS('server-id'='8001-8014','rds.endpoint'='rds-vpc.cn-hangzhou.aliyuncs.com','rds.region-id'='cn-hangzhou','rds.access-key-id'='LTAI5t***QSL','rds.access-key-secret'='egV3Q8d***VXT','rds.db-instance-id'='rm-bp***90upe1','rds.download.timeout'='180s') */;

END;

ods_rds_swoft_p

--********************************************************************--
-- Author:         zwen
-- Created Time:   2024-08-06 16:30:07
-- Description:    Write your description here
-- Hints:          You can use SET statements to modify the configuration
--********************************************************************--

SET 'table.optimizer.source-merge.enabled' = 'true';
SET 'table.cdas.scan.newly-added-table.enabled' = 'true'; 

CREATE TEMPORARY TABLE `sf_order_fmt` (
  `dt` AS IFNULL(FROM_UNIXTIME(`time_add`,'yyyyMMdd'),'99991231')  -- 实际上我们的time_add不会为null,但是from_unixtime认为可能返会null,即认为dt可能为null,dt在后面要作为分区,不能为null 这里手动转一下。
)with(
    'scan.startup.mode'='earliest-offset'
) LIKE `rds`.`swoft`.`sf_order`;


BEGIN STATEMENT SET;

CREATE TABLE IF NOT EXISTS `holo`.`zhongtai`.`ods.sf_order_p`(
  CONSTRAINT `pk_order_id_dt` PRIMARY KEY(id,dt) NOT ENFORCED
)
PARTITIONED BY(dt)
with(
    'enableTypeNormalization' = 'true'
)
AS TABLE `sf_order_fmt` /*+ OPTIONS('server-id'='9001-9002') */;

END;



ods_rds_swoft_ttl

--********************************************************************--
-- Author:         zwen
-- Created Time:   2024-08-16 13:46:24
-- Description:    Write your description here
-- Hints:          You can use SET statements to modify the configuration
--********************************************************************--

SET 'table.optimizer.source-merge.enabled' = 'true';
SET 'table.cdas.scan.newly-added-table.enabled' = 'true'; 


CREATE TABLE IF NOT EXISTS `holo`.`zhongtai`.`ods.sf_order` with(  'enableTypeNormalization' = 'true' ) AS TABLE `rds`.`swoft`.`sf_order` /*+ OPTIONS('server-id'='8030-8040','scan.startup.mode'='timestamp','scan.startup.timestamp-millis'='1724079600000','rds.endpoint'='rds-vpc.cn-hangzhou.aliyuncs.com','rds.region-id'='cn-hangzhou','rds.access-key-id'='LTAI5t***QSL','rds.access-key-secret'='egV3Q8d***VXT','rds.db-instance-id'='rm-bp***90upe1','rds.download.timeout'='180s') */;

ods_sls_zhongtai_swoft

--********************************************************************--
-- Author:         zwen
-- Created Time:   2024-08-03 12:34:41
-- Description:    Write your description here
-- Hints:          You can use SET statements to modify the configuration
--********************************************************************--

insert into `holo`.`zhongtai`.`ods.sls_zhongtai_swoft`
(   
    request
    ,body_bytes_sent
    ,sign
    ,request_method
    ,http_user_agent
    ,request_time
    ,content_type
    ,c_timestamp
    ,remote_addr
    ,topic
    ,time_stamp
    ,request_uri
    ,request_body
    ,up_resp_time
    ,request_length
    ,http_referer
    ,appid
    ,http_x_forwarded_for
    ,upstream_http_content_type
    ,status
    ,receive_time
    ,dt
)
select
   request
    ,body_bytes_sent
    ,sign
    ,request_method
    ,http_user_agent
    ,request_time
    ,content_type
    ,`timestamp`
    ,remote_addr
    ,__topic__
    ,time_stamp
    ,request_uri
    ,request_body
    ,up_resp_time
    ,request_length
    ,http_referer
    ,appid
    ,http_x_forwarded_for
    ,upstream_http_content_type
    ,status
    ,cast(__tag__['__receive_time__'] as bigint) as receive_time
    ,REGEXP_REPLACE(SUBSTR(`timestamp`,0,10),'-','') as dt

from
  `sls`.zhongtai.zhongtai_swoft /*+OPTIONS('startTime'='2024-08-05 00:00:00')*/
  where request_uri='/report/index/'
  ;

create_database_test

--********************************************************************--
-- Author:         zwen
-- Created Time:   2024-08-08 15:28:18
-- Description:    Write your description here
-- Hints:          You can use SET statements to modify the configuration
--********************************************************************--

BEGIN STATEMENT SET;

-- CREATE DATABASE IF NOT EXISTS `holo`.`zhongtaix`
-- AS DATABASE `rds`.`swoft` INCLUDING TABLE 'sf_user|sf_device' /*+ OPTIONS('server-id'='7001-7003') */;



-- CREATE SCHEMA IF NOT EXISTS `holo`.`zhongtaix`.`ods`
-- AS DATABASE `rds`.`swoft` INCLUDING TABLE 'sf_user|sf_device' /*+ OPTIONS('server-id'='7001-7003') */;
END;

tmp_result_table

--********************************************************************--
-- Author:         zwen
-- Created Time:   2024-08-09 13:40:00
-- Description:    Write your description here
-- Hints:          You can use SET statements to modify the configuration
--********************************************************************--
insert into `holo`.`zhongtai`.`tmp.tmp_result_table`
select od.dt
    ,od.appid
    ,max(app_name) app_name
    ,count(1) cnt
    ,sum(pay_fee) cash
from
`holo`.`zhongtai`.`ods.sf_order_p` od
left join
`holo`.`zhongtai`.`dim.dim_swoft_yunying_app_channel` FOR SYSTEM_TIME AS OF proctime() AS dim
-- (select  appid,max(app_name) app_name,max(is_overseas) is_overseas from `holo`.`zhongtai`.`dim.dim_swoft_yunying_app_channel` group by appid) FOR SYSTEM_TIME AS OF proctime() AS dim
ON od.appid = dim.appid and od.origin=dim.channel
where ifnull(dim.is_overseas,0)=0
and od.is_pay=1
group by od.dt,od.appid

adb_cdc

--********************************************************************--
-- Author:         zwen
-- Created Time:   2024-07-30 17:00:12
-- Description:    Write your description here
-- Hints:          You can use SET statements to modify the configuration
--********************************************************************--
-- adb cdc数据来源表
CREATE TEMPORARY TABLE test_adb_source (
  `id` BIGINT,
  `province` STRING,
  `amount` DOUBLE,
  `ts`  TIMESTAMP,
  PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
  'connector' = 'adb-mysql-cdc',
  'hostname' = 'am-bp***320.ads.aliyuncs.com',
  'username' = 'querydata',
  'password' = 'Jv***c!0',
  'database-name' = 'zhongtai',
  'table-name' = 'flink_test'
);

-- 聚合sink表
-- CREATE TEMPORARY TABLE agg_table (
--   `province` STRING,
--   `amount_sum` DOUBLE,
--   PRIMARY KEY (`province`) NOT ENFORCED
-- )WITH (
--   'connector' = 'print'
-- );

-- source表查询实例
-- select * from test_adb_source;


-- 简单聚合实例
-- INSERT INTO agg_table (province,amount_sum)
-- select province,sum(amount) amount_sum from test_adb_source group by province;

-- 写入holo
-- INSERT INTO holo.zt_hologres_test.test_sink_holo (province,amount_sum)
-- select province,sum(amount) amount_sum from test_adb_source group by province;

-- 测试延迟情况
INSERT INTO holo.zt_hologres_test.test_sink_holo_ts(id
,province
,amount
,ts
,ts_holo)
SELECT *,now() ts_holo FROM test_adb_source;

dwd_order

--********************************************************************--
-- Author:         zwen
-- Created Time:   2024-08-05 16:54:39
-- Description:    Write your description here
-- Hints:          You can use SET statements to modify the configuration
--********************************************************************--

insert into `holo`.`zhongtai`.`dwd.swoft_order` /*+ OPTIONS('sink.delete-strategy'='IGNORE_DELETE') */
select 
/*+ 
 LOOKUP('jdbcReadBatchSize'='512','jdbcReadBatchQueueSize'='1024','cache'='None','table'='t2','async'='true','shuffle' = 'true' )
,LOOKUP('jdbcReadBatchSize'='2048','jdbcReadBatchQueueSize'='4096','cache'='None','table'='t3','async'='true','shuffle' = 'true' ) 
*/
t1.appid appid
,t1.uuid
,t1.ostr ostr
,ifnull(t2.app_name,concat(cast(t1.appid as string),'unkown-app_name')) app_name
,ifnull(t2.channel,concat(cast(t1.appid as string),'unkown-channel')) channel
,ifnull(cast(t2.is_overseas as string),concat(cast(t1.appid as string),'unkown-overseas')) is_overseas
,case when t3.appid is null then 1 when FROM_UNIXTIME(t3.time_add,'yyyy-MM-dd')=CURRENT_DATE then 1 else 0 end is_new
-- ,0 is_new
,t1.is_pay
,pay_fee cash_rmb
,us_receive_amount cash_usd 
,t1.pay_time
,FROM_UNIXTIME(t1.create_time,'yyyyMMdd') create_dt
from 
`holo`.`zhongtai`.`ods.sf_order` t1
left join
`holo`.`zhongtai`.`dim.dim_swoft_yunying_app_channel` FOR SYSTEM_TIME AS OF proctime() AS t2
ON t1.appid = t2.appid and t1.origin=t2.channel
left join
`holo`.`zhongtai`.`ods.sf_device` FOR SYSTEM_TIME AS OF proctime() AS t3 
on t1.appid = t3.appid and t1.uuid = t3.uuid 
;

dim_swoft_yunying_app_channel

--********************************************************************--
-- Author:         zwen
-- Created Time:   2024-08-12 11:20:47
-- Description:    Write your description here
-- Hints:          You can use SET statements to modify the configuration
--********************************************************************--

INSERT INTO `holo`.`zhongtai`.`dim.dim_swoft_yunying_app_channel` /*+ OPTIONS('sink.delete-strategy'='IGNORE_DELETE') */
SELECT `ic`.`appid`                              AS `appid`
     , `ic`.`channel`                            AS `channel`
     , `sa`.`apartment_id`                       AS `apartment_id`
     , `ap`.`title`                              AS `apartment_name`
     , `sa`.`group_id`                           AS `group_id`
     , `sg`.`title`                              AS `group_name`
     , CASE
           WHEN `sa`.`project_id` = 0 THEN '0'
           ELSE `sa`.`project_id`
    END                                          AS `project_id`
     , COALESCE(`sp`.`title`, '-')               AS `project_name`
     , `sa`.`system`                             AS `system`
     , `dd`.`label`                              AS `system_name`
     , `sa`.`title`                              AS `app_name`
     , `sa`.`is_overseas`                        AS `is_overseas`
     , COALESCE(`tt`.`type_name`, '-')           AS `type_name`
     , COALESCE(`w`.`region`, '-')               AS `region`
     , COALESCE(`w`.`invest_income_type`, '-')   AS `invest_income_type`
     , COALESCE(`admin`.`realname`, '-')         AS `admin_name`
     , COALESCE(`admin`.`id`, 0)                 AS `admin_id`
     , `ic`.`way_id`                             AS `invest_way`
     , COALESCE(`w`.`way_name`, '-')             AS `invest_way_name`
     , COALESCE(`w`.`platform_id`, 0)            AS `platform_id`
     , COALESCE(`platform`.`platform_name`, '-') AS `platform_name`
     , COALESCE(`w`.`two_platform_id`, 0)        AS `two_platform_id`
     , COALESCE(`two`.`two_platform_name`, '-')  AS `two_platform_name`
     , COALESCE(`td`.`type_desc`, '-')           AS `type_desc`
     , COALESCE(`p`.`title`, '-')                AS `agent_name`
     , COALESCE(`p`.`note`, '-')                 AS `channel_remarks`
     , COALESCE(`department`.`id`, '0')          AS `business_group_id`
     , COALESCE(`department`.`name`, '-')        AS `business_group_name`
     , COALESCE(`ic`.`time_add`, '0')            AS `channel_timeadd`
     , COALESCE(`ic`.`is_master`, '0')           AS `is_master`
     , COALESCE(`ic`.`invest_target`, '0')       AS `invest_target`
     , COALESCE(`ic`.`channel_type`, '0')        AS `channel_type`
     , COALESCE(`ic`.`channel_sort`, '0')        AS `channel_sort`
     , COALESCE(`w`.`invest_way_sort`, '0')      AS `invest_way_sort`
FROM `holo`.`zhongtai`.`ods.dim_invest_channel_all` /*+ OPTIONS('upsertSource'='true') */                  `ic`
LEFT JOIN `holo`.`zhongtai`.`ods.sf_sys_app`           /*+ OPTIONS('upsertSource'='true') */  `sa` ON `ic`.`appid` = `sa`.`id`
LEFT JOIN `holo`.`zhongtai`.`ods.sf_sys_apartments`    /*+ OPTIONS('upsertSource'='true') */  `ap` ON `sa`.`apartment_id` = `ap`.`id`
LEFT JOIN `holo`.`zhongtai`.`ods.sf_sys_project`       /*+ OPTIONS('upsertSource'='true') */  `sp` ON `sa`.`project_id` = `sp`.`id`
LEFT JOIN `holo`.`zhongtai`.`ods.sf_sys_groups`        /*+ OPTIONS('upsertSource'='true') */  `sg` ON `sa`.`group_id` = `sg`.`id`
LEFT JOIN `holo`.`zhongtai`.`ods.sf_ip_landing_page`   /*+ OPTIONS('upsertSource'='true') */  `p`
          ON `ic`.`appid` = `p`.`appid`
              AND `ic`.`channel` = `p`.`origin`
LEFT JOIN `holo`.`zhongtai`.`ods.sf_sys_admin`       /*+ OPTIONS('upsertSource'='true') */    `admin` ON `p`.`admin_id` = `admin`.`id`
LEFT JOIN `holo`.`zhongtai`.`ods.dim_invest_type_way`    /*+ OPTIONS('upsertSource'='true') */      `w` ON `ic`.`way_id` = `w`.`way_id`
LEFT JOIN `holo`.`zhongtai`.`ods.dim_invest_type_platform`     /*+ OPTIONS('upsertSource'='true') */  `platform` ON `w`.`platform_id` = `platform`.`id`
LEFT JOIN `holo`.`zhongtai`.`ods.dim_invest_type_platform_two` /*+ OPTIONS('upsertSource'='true') */  `two` ON `w`.`two_platform_id` = `two`.`id`
LEFT JOIN `holo`.`zhongtai`.`ods.dim_invest_type_desc`         /*+ OPTIONS('upsertSource'='true') */  `td` ON `w`.`desc_id` = `td`.`id`
LEFT JOIN `holo`.`zhongtai`.`ods.dim_invest_type`              /*+ OPTIONS('upsertSource'='true') */  `tt` ON `w`.`type_id` = `tt`.`id`
LEFT JOIN `holo`.`zhongtai`.`ods.zt_admin_member_department_relation` /*+ OPTIONS('upsertSource'='true') */ `relation` ON `admin`.`id` = `relation`.`member_id`
LEFT JOIN `holo`.`zhongtai`.`ods.zt_admin_member_department`     /*+ OPTIONS('upsertSource'='true') */     `department` ON `relation`.`department_id` = `department`.`id`
LEFT JOIN `holo`.`zhongtai`.`ods.sys_dict_data`     /*+ OPTIONS('upsertSource'='true') */     `dd`
          ON `sa`.`system` = `dd`.`val`
              AND `dd`.`type` = 'system_no'
--********************************************************************--
-- Author:         zwen
-- Created Time:   2024-08-12 11:20:47
-- Description:    Write your description here
-- Hints:          You can use SET statements to modify the configuration
--********************************************************************--

INSERT INTO `holo`.`zhongtai`.`dim.dim_swoft_yunying_app_channel_simpleify`  /*+ OPTIONS('sink.delete-strategy'='IGNORE_DELETE') */
SELECT `ic`.`appid`                              AS `appid`
     , `ic`.`channel`                            AS `channel`
     , `sa`.`system`                             AS `system`
     , `dd`.`label`                              AS `system_name`
     , `sa`.`title`                              AS `app_name`
     , `sa`.`is_overseas`                        AS `is_overseas`
FROM `holo`.`zhongtai`.`ods.dim_invest_channel_all` /*+ OPTIONS('upsertSource'='true') */                  `ic`
LEFT JOIN `holo`.`zhongtai`.`ods.sf_sys_app` /*+ OPTIONS('upsertSource'='true') */            `sa` ON `ic`.`appid` = `sa`.`id`
LEFT JOIN `holo`.`zhongtai`.`ods.sys_dict_data` /*+ OPTIONS('upsertSource'='true') */         `dd`
          ON `sa`.`system` = `dd`.`val`
              AND `dd`.`type` = 'system_no'

查询脚本

A_catalog_create

--***************************************************************************--
-- 风险警告: 对 DDL/DML 语句运行将会直接影响线上元数据/数据,请谨慎操作。
--***************************************************************************--

DROP CATALOG holox;
CREATE CATALOG holox 
WITH (
  'type' = 'hologres',
  'property-version'='1', -- 通用参数从8.0.6支持,建议设置为1
  'endpoint' = 'hgprec***gzhou-vpc-st.hologres.aliyuncs.com:80', 
  'username' = 'BASIC$flink_holo',
  'password' = '***',
  'dbname' = 'zhongtai',
  'binlog' = 'true', 
  'sdkMode' = 'jdbc', -- 推荐使用jdbc模式。在最新版本中默认是jdbc_fixed模式;jdbc_fixed 支持过滤非法字符
  'cdcmode' = 'true',
  'connectionpoolname' = 'the_conn_pool',
  'sink.delete-strategy' = 'IGNORE_DELETE',  -- 宽表merge需要开启,防止回撤 8.0.8才开始支持。
  'ignoredelete' = 'true',  -- 宽表merge需要开启,防止回撤。
  'partial-insert.enabled' = 'true', -- 宽表merge需要开启此参数,实现部分列更新。
  'mutateType' = 'insertOrUpdate', -- 宽表merge需要开启此参数,实现部分列更新。
  'remove-u0000-in-text.enabled'='true', -- 过滤非法字符,防止报错
  'table_property.binlog.level' = 'replica', --也可以在创建catalog时传入持久化的hologres表属性,之后创建表时,默认都开启binlog。
  'table_property.binlog.ttl' = '2592000',
  'table_property.orientation'='row,column',
  'ignore-non-persisted-options'='false'  -- 是否忽略不可持久化参数 可持久化意味着当再次从Catalog读取该表的相关信息时,可以重新获取您在DDL中定义的一致的信息。目前仅支持endpoint、username、password和dbname可持久化选项
);


-- DROP CATALOG rds;
CREATE CATALOG `rds` WITH(
  'type' = 'mysql',
  'hostname' = 'rm-bp***90upe1.mysql.rds.aliyuncs.com',
  'port' = '3306',
  'username' = 'swoft_read',
  'password' = '***',
  'default-database' = 'swoft',
  'catalog.table.metadata-columns' = 'op_ts;table_name;database_name',
  'property-version'='1',
  'catalog.table.treat-tinyint1-as-boolean'='false' -- 避免tinyint1被识别成布尔值 
);

Q-query_table_info

--***************************************************************************--
-- 风险警告: 对 DDL/DML 语句运行将会直接影响线上元数据/数据,请谨慎操作。
--***************************************************************************--

show create table `holo`.`zhongtai`.`tmp.tmp_result_table_no_dim`;

alter操作

--***************************************************************************--
-- 风险警告: 对 DDL/DML 语句运行将会直接影响线上元数据/数据,请谨慎操作。
--***************************************************************************--

ALTER TABLE `holo`.`zhongtai`.`tmp.tmp_result_table` SET('ignoredelete'='false');
ALTER TABLE `holo`.`zhongtai`.`tmp.tmp_result_table` SET('sink.delete-strategy'='IGNORE_DELETE');

UDF_TEST

--***************************************************************************--
-- 风险警告: 对 DDL/DML 语句运行将会直接影响线上元数据/数据,请谨慎操作。
--***************************************************************************--

-- UDF
select udf_test_substr('24adf3423',3,5);

-- UDAF


select udaf_test_sum
from 
(
select 1 as x union all
select 6 as x union all
select 3 as x 
)t;

-- UDTF 
CREATE TEMPORARY TABLE ASI_UDTF_Sink (
  name  VARCHAR,
  place  VARCHAR
) WITH (
  'connector' = 'blackhole'
);
INSERT INTO ASI_UDTF_Sink
SELECT name,place
FROM (
  select 'aaa|dddd' message union all
  select 'xxx|1111' message
),lateral table(udtf_test_split(message)) as T(name,place)

SQL_PLAYGROUND

--***************************************************************************--
-- 风险警告: 对 DDL/DML 语句运行将会直接影响线上元数据/数据,请谨慎操作。
--***************************************************************************--

select FROM_UNIXTIME(1573031988,'yyyyMMdd');

SELECT DATE_FORMAT('2024-08-03T14:14:14+08:00','yyyyMMdd');

select REGEXP_REPLACE(SUBSTR('2024-08-03T14:14:14+08:00',0,10),'-','');

select CURRENT_DATE;



SELECT window_start, window_end, item, SUM(price) AS total_price
FROM
(SELECT item, price, CAST(window_start as varchar) as window_start, window_end FROM
TABLE(
SESSION(TABLE `holo`.`zhongtai`.`ods.sf_order`, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)))
GROUP BY item, window_start, window_end;


select  TYPEOF(FROM_UNIXTIME(1573031988,'yyyyMMdd'));

SET 'table.local-time-zone'='Asia/Shanghai';
SET 'table.local-time-zone'='UTC+8';
select TO_TIMESTAMP_LTZ(1724396892,0);

test_create_catalog_rds2

--***************************************************************************--
-- 风险警告: 对 DDL/DML 语句运行将会直接影响线上元数据/数据,请谨慎操作。
--***************************************************************************--


CREATE CATALOG `rds_2` WITH(
  'type' = 'mysql',
  'hostname' = 'rm-bp***90upe1.mysql.rds.aliyuncs.com',
  'port' = '3306',
  'username' = 'swoft_read',
  'password' = 'u***b',
  'default-database' = 'swoft',
  'catalog.table.metadata-columns' = 'op_ts;table_name;database_name',
  'property-version'='1',
  'catalog.table.treat-tinyint1-as-boolean'='false', -- 避免tinyint1被识别成布尔值 
  'rds.endpoint'='rds-vpc.cn-hangzhou.aliyuncs.com',
  'rds.region-id'='cn-hangzhou',
  'rds.access-key-id'='LTAI5t***QSL',
  'rds.access-key-secret'='egV3Q8d***VXT',
  'rds.db-instance-id'='rm-bp***90upe1',
  'rds.download.timeout'='180s'
);


SET 'execution.runtime-mode' = 'streaming';
EXPLAIN CHANGELOG_MODE 
insert into holox.zhongtai.`ods.sf_order`
/*+ OPTIONS('ignore-non-persisted-options'='true') */
select * from rds.swoft.sf_order;

资源管理

image.png

HOLO一侧#

删表

drop table ods.sf_sys_app;
drop table ods.sf_sys_apartments;
drop table ods.sf_sys_project;
drop table ods.sf_sys_groups;
drop table ods.sf_ip_landing_page;
drop table ods.sf_sys_admin;
drop table ods.zt_admin_member_department_relation;
drop table ods.zt_admin_member_department;
drop table ods.sys_dict_data;
drop table ods.dim_invest_channel_all;
drop table ods.dim_invest_type_way;
drop table ods.dim_invest_type_platform;
drop table ods.dim_invest_type_platform_two;
drop table ods.dim_invest_type_desc;
drop table ods.dim_invest_type;
drop table ods.sf_device;
drop table ods.sf_user;

查询表分区信息

SELECT
    nsp_name AS schema_name,
    tbl_name AS table_name,
    ENABLE,
    time_unit,
    time_zone,
    num_precreate,
    num_retention,
    b.usename AS create_user,
    cret_time,
    schd_start_time,
    options
FROM
    hologres.hg_partitioning_config AS a
    LEFT JOIN pg_user AS b ON a.cret_user = b.usesysid;

调整表分区生命周期

ALTER TABLE ods.sf_order_p SET (
   auto_partitioning_enable = 'true',
   auto_partitioning_time_zone = 'Asia/Shanghai',
   auto_partitioning_time_unit = 'DAY',
   auto_partitioning_num_precreate = '3',
   auto_partitioning_num_retention = '2'
);


BEGIN;
CALL SET_TABLE_PROPERTY('ods.sf_order', 'time_to_live_in_seconds', '2592000');
COMMIT ;

查询binlog信息

SELECT hg_binlog_lsn,hg_binlog_event_type,hg_binlog_timestamp_us,* FROM ods.sf_device;

SELECT
    *
FROM
    hologres.hg_table_properties
WHERE
    property_key = 'binlog.level'
    AND property_value = 'replica'
    AND table_namespace='dim';

建表

SELECT HG_DUMP_SCRIPT('ods.sf_order_p'); -- 获取建表语句


BEGIN;

DROP TABLE IF EXISTS dim.dim_swoft_yunying_app_channel;
CREATE TABLE IF NOT EXISTS dim.dim_swoft_yunying_app_channel(
  appid                   BIGINT,
  channel                 TEXT,
  apartment_id            BIGINT,
  apartment_name          TEXT,
  group_id                BIGINT,
  group_name              TEXT,
  project_id              TEXT,
  project_name            TEXT,
  system                  BIGINT,
  system_name             TEXT,
  app_name                TEXT,
  is_overseas             BIGINT,
  type_name               TEXT,
  region                  TEXT,
  invest_income_type      TEXT,
  admin_name              TEXT,
  admin_id                BIGINT,
  invest_way              BIGINT,
  invest_way_name         TEXT,
  platform_id             BIGINT,
  platform_name           TEXT,
  two_platform_id         BIGINT,
  two_platform_name       TEXT,
  type_desc               TEXT,
  agent_name              TEXT,
  channel_remarks         TEXT,
  business_group_id       TEXT,
  business_group_name     TEXT,
  channel_timeadd         TEXT,
  is_master               TEXT,
  invest_target           TEXT,
  channel_type            TEXT,
  channel_sort            TEXT,
  invest_way_sort         TEXT,
  PRIMARY KEY(appid,channel)
);
CALL SET_TABLE_PROPERTY('dim.dim_swoft_yunying_app_channel', 'orientation', 'row,column');
CALL SET_TABLE_PROPERTY('dim.dim_swoft_yunying_app_channel', 'binlog.level', 'replica');
CALL SET_TABLE_PROPERTY('dim.dim_swoft_yunying_app_channel', 'binlog.ttl', '2592000');

COMMIT;


BEGIN ;
CREATE TABLE IF NOT EXISTS tmp.tmp_result_table (
  dt  TEXT,
  appid                   BIGINT,
  app_name                TEXT,
  cnt                     BIGINT ,
  cash                    FLOAT8 ,
  PRIMARY KEY(dt,appid)
);
CALL SET_TABLE_PROPERTY('tmp.tmp_result_table', 'orientation', 'row,column');
CALL SET_TABLE_PROPERTY('tmp.tmp_result_table', 'binlog.level', 'replica');
CALL SET_TABLE_PROPERTY('tmp.tmp_result_table', 'binlog.ttl', '2592000');
COMMIT ;


BEGIN ;
CREATE TABLE IF NOT EXISTS tmp.tmp_wzw_test (
  tag     TEXT,
  num                   BIGINT,
  PRIMARY KEY(tag)
);
CALL SET_TABLE_PROPERTY('tmp.tmp_wzw_test', 'orientation', 'row,column');
CALL SET_TABLE_PROPERTY('tmp.tmp_wzw_test', 'binlog.level', 'replica');
CALL SET_TABLE_PROPERTY('tmp.tmp_wzw_test', 'binlog.ttl', '2592000');
COMMIT ;


BEGIN ;
CREATE TABLE IF NOT EXISTS tmp.tmp_result_table_no_p (

  appid                   BIGINT,
  cnt                     BIGINT ,
  PRIMARY KEY(appid)
);
CALL SET_TABLE_PROPERTY('tmp.tmp_result_table_no_p', 'orientation', 'row,column');
CALL SET_TABLE_PROPERTY('tmp.tmp_result_table_no_p', 'binlog.level', 'replica');
CALL SET_TABLE_PROPERTY('tmp.tmp_result_table_no_p', 'binlog.ttl', '2592000');
COMMIT ;



BEGIN;

DROP TABLE IF EXISTS dim.dim_swoft_yunying_app_channel_simpleify;
CREATE TABLE IF NOT EXISTS dim.dim_swoft_yunying_app_channel_simpleify (
  appid                   BIGINT,
  channel                 TEXT,
  system                  BIGINT,
  system_name             TEXT,
  app_name                TEXT,
  is_overseas             BIGINT,
  PRIMARY KEY(appid,channel)
);
CALL SET_TABLE_PROPERTY('dim.dim_swoft_yunying_app_channel_simpleify', 'orientation', 'row,column');
CALL SET_TABLE_PROPERTY('dim.dim_swoft_yunying_app_channel_simpleify', 'binlog.level', 'replica');
CALL SET_TABLE_PROPERTY('dim.dim_swoft_yunying_app_channel_simpleify', 'binlog.ttl', '2592000');

COMMIT;


BEGIN;

DROP TABLE IF EXISTS dwd.swoft_order;
CREATE TABLE IF NOT EXISTS dwd.swoft_order (
  appid                   BIGINT,
  uuid                    TEXT,
  ostr                    TEXT,
  app_name                TEXT,
  channel                 TEXT,
  is_overseas             TEXT,
  is_new                  BIGINT,
  is_pay                  BIGINT,
  cash_rmb                FLOAT8,
  cash_usd                FLOAT8,
  pay_time                BIGINT,
  create_dt               TEXT,
  PRIMARY KEY(appid,uuid,channel)
);
CALL SET_TABLE_PROPERTY('dwd.swoft_order', 'orientation', 'row,column');
CALL SET_TABLE_PROPERTY('dwd.swoft_order', 'binlog.level', 'replica');
CALL SET_TABLE_PROPERTY('dwd.swoft_order', 'binlog.ttl', '2592000');

COMMIT;




BEGIN;
DROP TABLE tmp.sf_order_api;
CREATE TABLE tmp.sf_order_api (
    appid bigint NOT NULL,
    ostr text NOT NULL,
    pay_fee double precision,
    is_pay bigint,
    create_dt text
    ,PRIMARY KEY (appid, ostr)
)with (
orientation = 'row,column'
);
COMMIT;

创建scheme

create schema ods;
create schema dwd;
create schema dws;
create schema ads;
create schema tmp;

获取建表语句

SELECT HG_DUMP_SCRIPT('tmp.sf_order_api'); -- 获取建表语句
SELECT HG_DUMP_SCRIPT('ods.sf_device'); -- 获取建表语句


查询表占用空间大小

SELECT hologres.pg_relation_size('ods.sf_device','binlog');--返回单位是Byte

-- SELECT hologres.hg_relation_size('dim.dim_swoft_yunying_app_channel','[data|binlog|mv|all]') ;
SELECT hologres.hg_relation_size('ods.sf_order_p','binlog') binlog_p,hologres.hg_relation_size('ods.sf_order_p_20240814','binlog') binlog_s;
SELECT hologres.hg_relation_size('ods.sf_order_p_20240814','data') tdata,hologres.hg_relation_size('ods.sf_order_p_20240814','binlog') binlog;
SELECT hologres.hg_relation_size('ods.sf_device','data') tdata,hologres.hg_relation_size('ods.sf_device','binlog') binlog;



SELECT hologres.hg_relation_size('ods.sf_order','all') ;

SELECT pg_size_pretty(hologres.hg_relation_size('ods.sf_order','all'));
SELECT pg_size_pretty(hologres.hg_relation_size('ods.sf_order_p','all'));


TRUNCATE table ods.sf_device;

select TO_CHAR(TO_TIMESTAMP(create_time), 'YYYYMMDD'), count(1) from ods.sf_order group by TO_CHAR(TO_TIMESTAMP(create_time), 'YYYYMMDD');

DELETE FROM ods.sf_order WHERE TO_CHAR(TO_TIMESTAMP(create_time), 'YYYYMMDD') != '20240827';


dim 建表 dim.dim_swoft_yunying_app_channel

BEGIN;
CREATE TABLE IF NOT EXISTS dim.dim_swoft_yunying_app_channel (
  appid                   BIGINT,
  channel                 TEXT,
  apartment_id            BIGINT,
  apartment_name          TEXT,
  group_id                BIGINT,
  group_name              TEXT,
  project_id              TEXT,
  project_name            TEXT,
  system                  BIGINT,
  system_name             TEXT,
  app_name                TEXT,
  is_overseas             BIGINT,
  type_name               TEXT,
  region                  TEXT,
  invest_income_type      TEXT,
  admin_name              TEXT,
  admin_id                BIGINT,
  invest_way              BIGINT,
  invest_way_name         TEXT,
  platform_id             BIGINT,
  platform_name           TEXT,
  two_platform_id         BIGINT,
  two_platform_name       TEXT,
  type_desc               TEXT,
  agent_name              TEXT,
  channel_remarks         TEXT,
  business_group_id       TEXT,
  business_group_name     TEXT,
  channel_timeadd         TEXT,
  is_master               TEXT,
  invest_target           TEXT,
  channel_type            TEXT,
  channel_sort            TEXT,
  invest_way_sort         TEXT,
  PRIMARY KEy(appid,channel)
);
CALL SET_TABLE_PROPERTY('ods.sf_device', 'orientation', 'row,column');
CALL SET_TABLE_PROPERTY('ods.sf_device', 'binlog.level', 'replica');
CALL SET_TABLE_PROPERTY('ods.sf_device', 'binlog.ttl', '2592000');

COMMIT;

HOLOGRES + FLINK 全托管实时数仓调测试记录
https://www.zwenooo.link/posts/published/bigdata/flink-holo-test/flink-holo-test/
作者
zwen
发布于
2024-08-20
许可协议
CC BY-NC-SA 4.0