基于 Databend 实时计算实现
今天这篇文章和大家交流一下在 Databend 如何实现近实时计算这个能力。 该功能目前在客户应用场景:
- 游戏用户的实时激励,玩游戏中的奖励
- 游戏客户一场游戏结束后,下一场如何匹配队友
- 游戏结束后,本场游戏的数值展示等场景
- 数据实时推送到远端
基于 Databend 有实时计算,在实际客户落地的场景中替换大量的 flink 任务,大大简化了用户的流计算管理方面的开销。
1. 整体的架构
在 Databend 中通过注册外部 UDF , 利用 Databend Stream 获取到指表的增量,把增量传递给 UDF,利用外部 UDF 实现逻辑封装数据推送给指定目标数据。
调度实现
在 Databend Cloud 中,可以使用内置的 task 实现任务调度
https://docs.databend.cn/sql/sql-commands/ddl/task/ddl-create_task/
非 Databend Cloud 环境下,可以使用定时任务或公司内部的大数据平台的调度任务进行调度。
2. 例子: Databend 获取数据后写入 MySQL 中
以 Databend 中加载的数据为例,通过 Stream 获取增量后,将数据推送到 MySQL 中对应的表中。
安装环境依赖
pip install databend-udf mysql-connector-python
在 MySQL 中创建表
MySQL 所在机器: 192.168.1.100 端口: 3306
use wubx;
create table t01(id int not null, c1 varchar(200), primary key(id));
grant all privileges on wubx.* to 'wubx'@'%' identified by 'wubxwubx';
udf server
push_mysql_udf.py
位于 192.168.1.201 上
from databend_udf import *
import json
import mysql.connector
from mysql.connector import pooling
db_config={
'user':'wubx',
'password':'wubxwubx',
'host':'192.168.1.100',
'database':'wubx'
}
pool=mysql.connector.pooling.MySQLConnectionPool(
pool_name="mypool",
pool_size=5,
**db_config
)
# 将数据插入 MySQL 的 UDF
@udf(input_types=["STRING","variant"], result_type="STRING")
def push_mysql_v2(tb_name: str, row: any) -> str:
try:
# 从连接池中获取连接
conn = pool.get_connection()
cursor = conn.cursor()
# print(row)
# 解析 JSON 数据
row_data = row
# 获取表名和字段值
id = row_data.get("id")
c1 = row_data.get("c1")
# 构建插入的 SQL 语句
sql = f"INSERT INTO {tb_name} (id, c1) VALUES (%s, %s)"
cursor.execute(sql, (id, c1))
conn.commit()
return "Success"
except mysql.connector.Error as e:
return f"Database error: {str(e)}"
except json.JSONDecodeError:
return "Invalid JSON format"
finally:
# 确保连接和游标被关闭
if cursor:
cursor.close()
if conn:
conn.close()
# 注册 UDF
if __name__ == "__main__":
# 启动 UDF 服务器
server = UDFServer("0.0.0.0:8815")
# 注册加密和解密函数
server.add_function(push_mysql_v2)
# 启动服务
server.serve()
启动 UDF server
python3 push_mysql_udf.py
注册 UDF
在对应的 databend-query 配置文件中: databend-query.toml 全局部分添加
# udf server
enable_udf_server = true
udf_server_allow_insecure = true
udf_server_allow_list = ['http://192.168.1.201:8815']
这里主要出于安全考虑,不允许随意注册 udf server 而做的限制。
CREATE or replace FUNCTION push_mysql(string, variant) RETURNS STRING LANGUAGE PYTHON HANDLER = 'push_mysql_v2' ADDRESS = 'http://192.168.1.201:8815';
验证 UDF 工作正常
select push_mysql('t01',json_object('id',1,'c1','databend'));
┌───────────────────────────────────────────────────────────┐
│ push_mysql('t01', json_object('id', 1, 'c1', 'databend')) │
│ Nullable(String) │
├───────────────────────────────────────────────────────────┤
│ Success │
└───────────────────────────────────────────────────────────┘
1 row read in 0.015 sec. Processed 1 row, 1 B (66.67 rows/s, 66 B/s)
查看目标 mysql 中对应的 t01 中可以看到对应的数据
MySQL [wubx]> select * from t01;
+----+----------+
| id | c1 |
+----+----------+
| 1 | databend |
+----+----------+
1 row in set (0.000 sec)
可以看到数据已经推送到 MySQL 中。
在 Databend 创建对表的 t01
create or replace table t01 (id int, c1 varchar);
create or replace stream stream_t01_2mysqlt01 on table t01;
stream 功能是一个企业版特性,如果你在测试中需要该功能,可以添加最后面的微信索要企业版授权。 或是直接添加 小D: Databend 获取支持。
写入数据
insert into t01 values(3,'MySQL');
insert into t01 values(4,'Databend');
select * from stream_t01_2mysqlt01;
┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ id │ c1 │ change$action │ change$is_update │ change$row_id │
│ Nullable(Int32) │ Nullable(String) │ String │ Boolean │ Nullable(String) │
├─────────────────┼──────────────────┼───────────────┼──────────────────┼────────────────────────────────────────┤
│ 3 │ MySQL │ INSERT │ false │ 0196aac59f937165bd08a69346430194000000 │
│ 4 │ Databend │ INSERT │ false │ 0196aac59fcf7bffb94ecc815b5272ef000000 │
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
2 rows read in 0.035 sec. Processed 2 rows, 49 B (57.14 rows/s, 1.37 KiB/s)
数据推到 MySQL
select push_mysql('t01',json_object('id',id,'c1',c1)) from stream_t01_2mysqlt01 with consume;
root@localhost:8000/default> select push_mysql('t01',json_object('id',id,'c1',c1)) from stream_t01_2mysqlt01 with consume;
┌────────────────────────────────────────────────────┐
│ push_mysql('t01', json_object('id', id, 'c1', c1)) │
│ Nullable(String) │
├────────────────────────────────────────────────────┤
│ Success │
│ Success │
└────────────────────────────────────────────────────┘
2 rows read in 0.057 sec. Processed 2 rows, 49 B (35.09 rows/s, 859 B/s)
# 消费完后对应的 stream 中查不到数据
root@localhost:8000/default> select * from stream_t01_2mysqlt01;
0 row read in 0.029 sec. Processed 0 row, 0 B (0 row/s, 0 B/s)
验证数据推送查看
MySQL [wubx]> select * from t01;
+----+----------+
| id | c1 |
+----+----------+
| 1 | databend |
| 3 | MySQL |
| 4 | Databend |
+----+----------+
3 rows in set (0.000 sec)
可以看到后面的两条记录已经推送过来。 上面的演示属于一个非常简单的演示,对于 update ,delete 如果需要处理,需要把对 stream 中的 action 也传入 udf 中,来做相应的处理, 同理 UDF 的逻辑也可以考虑进一步优化。
3. 总结
通过上述例子,我们演示了如何通过 UDF 实现 Databend 与外部数据系统的联动。该方案已经在多个游戏类客户中成功替换了大量的 Flink 实时任务,显著简化了流计算管理的开销。此外,我们还展示了如何在 Databend 中创建和调度任务,以及如何通过 UDF 将数据实时推送到 MySQL 中。对于更复杂的场景,如处理 update 和 delete 操作,可以通过进一步优化 UDF 逻辑来实现。这一方案不仅提高了数据处理的效率,还降低了系统的复杂性和维护成本。
💬 社区支持
有问题与我们的团队联系:Slack微信:82565387