kafka_fdw

Kafka外部数据源包装器

概览

扩展包名版本分类许可证语言
kafka_fdw0.0.3FDWPostgreSQLC
ID扩展名BinLibLoadCreateTrustReloc模式
8730kafka_fdw-
相关扩展pgmq mongo_fdw redis_fdw wrappers multicorn redis hdfs_fdw wal2json

版本

类型仓库版本PG 大版本包名依赖
EXTPIGSTY0.0.31817161514kafka_fdw-
RPMPIGSTY0.0.31817161514kafka_fdw_$v-
DEBPIGSTY0.0.31817161514postgresql-$v-kafka-fdw-
OS / PGPG18PG17PG16PG15PG14
el8.x86_64
el8.aarch64
el9.x86_64
el9.aarch64
el10.x86_64
el10.aarch64
d12.x86_64
d12.aarch64
d13.x86_64
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
d13.aarch64
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
u22.x86_64
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
u22.aarch64
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
u24.x86_64
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
u24.aarch64
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3

构建

您可以使用 pig build 命令构建 kafka_fdw 扩展的 RPM / DEB 包:

pig build pkg kafka_fdw         # 构建 RPM / DEB 包

安装

您可以直接安装 kafka_fdw 扩展包的预置二进制包,首先确保 PGDGPIGSTY 仓库已经添加并启用:

pig repo add pgsql -u          # 添加仓库并更新缓存

使用 pig 或者是 apt/yum/dnf 安装扩展:

pig install kafka_fdw;          # 当前活跃 PG 版本安装
pig ext install -y kafka_fdw -v 18  # PG 18
pig ext install -y kafka_fdw -v 17  # PG 17
pig ext install -y kafka_fdw -v 16  # PG 16
pig ext install -y kafka_fdw -v 15  # PG 15
pig ext install -y kafka_fdw -v 14  # PG 14
dnf install -y kafka_fdw_18       # PG 18
dnf install -y kafka_fdw_17       # PG 17
dnf install -y kafka_fdw_16       # PG 16
dnf install -y kafka_fdw_15       # PG 15
dnf install -y kafka_fdw_14       # PG 14
apt install -y postgresql-18-kafka-fdw   # PG 18
apt install -y postgresql-17-kafka-fdw   # PG 17
apt install -y postgresql-16-kafka-fdw   # PG 16
apt install -y postgresql-15-kafka-fdw   # PG 15
apt install -y postgresql-14-kafka-fdw   # PG 14

创建扩展

CREATE EXTENSION kafka_fdw;

用法

README

kafka_fdw 是一个将 Kafka 消息暴露为 PostgreSQL 外部表的外部数据包装器。上游 README 明确提醒,这个项目尚未达到生产级就绪状态。

服务器与映射

先定义带 Kafka broker 列表的外部服务器,再创建用户映射:

CREATE EXTENSION kafka_fdw;

CREATE SERVER kafka_server
FOREIGN DATA WRAPPER kafka_fdw
OPTIONS (brokers 'localhost:9092');

CREATE USER MAPPING FOR PUBLIC SERVER kafka_server;

外部表

Kafka 外部表必须包含两个元数据列,一个标记为 partition 'true',另一个标记为 offset 'true'。其余列用于描述消息负载。

CSV 消息

CREATE FOREIGN TABLE kafka_test (
    part int OPTIONS (partition 'true'),
    offs bigint OPTIONS (offset 'true'),
    some_int int,
    some_text text,
    some_date date,
    some_time timestamp
)
SERVER kafka_server
OPTIONS (
    format 'csv',
    topic 'contrib_regress',
    batch_size '30',
    buffer_delay '100'
);

对于 CSV,列按位置映射。上游说明,字段校验强度取决于消息写入方,因此在数据质量不稳定时,严格解析和 junk 处理选项很重要。

JSON 消息

CREATE FOREIGN TABLE kafka_test_json (
    part int OPTIONS (partition 'true'),
    offs bigint OPTIONS (offset 'true'),
    some_int int OPTIONS (json 'int_val'),
    some_text text OPTIONS (json 'text_val'),
    some_date date OPTIONS (json 'date_val'),
    some_time timestamp OPTIONS (json 'time_val')
)
SERVER kafka_server
OPTIONS (
    format 'json',
    topic 'contrib_regress_json',
    batch_size '30',
    buffer_delay '100'
);

对于 JSON,每个列都可以通过 json 选项映射到对象键。当前实现支持 JSON 对象,不支持顶层 JSON 数组。

查询与写入

偏移量列和分区列是特殊列,上游 README 建议在查询中尽可能显式指定它们:

SELECT * FROM kafka_test WHERE part = 0 AND offs > 1000 LIMIT 60;

SELECT *
FROM kafka_test
WHERE (part = 0 AND offs > 100)
   OR (part = 1 AND offs > 300)
   OR (part = 3 AND offs > 700);

也可以通过 INSERT 发送消息。如果指定了分区值,就使用该分区;否则由 Kafka 内置分区器决定:

INSERT INTO kafka_test(part, some_int, some_text)
VALUES
    (0, 5464565, 'some text goes into partition 0'),
    (NULL, 5464565, 'some text goes into partition selected by kafka');

错误处理

默认行为较为宽松:

  • 缺失尾部列会视为 NULL
  • 多余字段会被忽略
  • 但无法解析的值默认仍会报错

相关表选项和辅助列包括:

  • strict 'true',拒绝列数不匹配
  • ignore_junk 'true',将格式错误的值设为 NULL
  • 标记为 junk 'true' 的列,用于捕获原始负载
  • 标记为 junk_error 'true' 的列,用于捕获解析错误

构建说明

该扩展使用 librdkafka,上游构建步骤很标准:

make && make install
make installcheck

测试环境假定 Kafka 运行在 localhost:9092,ZooKeeper 运行在 localhost:2181


最后修改 2026-04-10: extension update (13b4540)