PipelineDB 实践

目前公司采用Shell、Java、Storm等方式进行实时数据的处理。后两者开发成本较高,因此寻找新的替代方案,使得我们能够快速开发迭代实时需求。

基础使用

连接pipelinedb

# 设置密码环境变量
export PGPASSWORD='password'

# 登录PipelineDB
psql -h pg1922 -p 1922 -U username -d pipeline "$@"

命令帮助

# psql命令帮助
\?
# SQL命令帮助
\h

列出Database

\l 或 \l+

创建Schema

CREATE SCHEMA dw_bihell AUTHORIZATION username;

列出Schema

\dn 或 \dn+

切换Schema

SET search_path TO dw_bihell;

列出表、视图等

# 默认shema下的
\d 或 \d+
# 指定shema
\dp [PATTERN] 或 \z [PATTERN]
比如 \z dw_order.*

Streams

streams可以向Continuous Views推送数据,一个stream或称为事件(event),看着跟常规的表行一样,而且数据写入stream和写入表的接口完全相同,但是steam跟表有着根本的不同。

简单的说,存在于steam中的事件在被所有Continuous Views消费以后就会’消失’,无法被用户通过select语句查询到,即steam专门作为Continuous Views的数据输入源而存在。这跟Storm中的Spout很相似。

创建STREAM

本文实例将接收kafka的数据

1
2
3
4
5
6
7
8
-- 语法
CREATE STREAM stream_name ( [
{ column_name data_type [ COLLATE collation ] | LIKE parent_stream } [, ... ]
] )
-- 可以直接支持json数据
CREATE STREAM dw_bihell.rt_oreder_bihell (log json);
-- 或者直接接收文本(kafka发数据的时候根据分隔符分割行)
CREATE STREAM dw_bihell.rt_oreder_bihell (collect_date text,record_status integer,operate_type integer,update_mask integer,order_date text,bill_date text,order_id bigint,order_type)

STREAM 增加字段

ALTER STREAM stream ADD COLUMN x integer;

删除STREAM

DROP STREAM

查询已创建STREAM

SELECT * FROM pipeline_streams() ORDER BY schema;

创建Kafka配置

PipelineDB有个Kafka插件pipeline_kafka使得它可以获取kafka的消息。

添加kafak broker

pipeline_kafka.add_broker (<host>[:<port>])

查看pipeline_kafka配置信息

有个专门的pipeline_kafka schema可以看到目前配置的信息

SET search_path TO pipeline_kafka;
\d+

消费Kafka

语法:

pipeline_kafka.consume_begin ( topic text, stream text, format := ‘text’, delimiter := E’\t’, quote := NULL, escape := NULL, batchsize := 1000, maxbytes := 32000000, parallelism := 1, start_offset := NULL )

实例:

1
2
3
4
-- 直接传json
SELECT pipeline_kafka.consume_begin ('bihell', 'dw_bihell.rt_oreder_bihell', format := 'json', start_offset := -2);
-- 传文本,并指定分隔符
select pipeline_kafka.consume_begin ('ordermaster','dw_order.rt_stream_order_master_eb', format := 'text', delimiter := E'\u0001')

参考

PipelineDB Documentation
MySQL to PostgreSQL Types Mapping

打赏支持:如果你觉得我的文章对你有所帮助,可以打赏我哟。