# Kafka输入组件使用说明
# 组件说明
Kafka输入组件用于从 Apache Kafka 数据库中读取数据。
# 配置项说明
配置名称 | 数据类型 | 是否必须 | 默认值 | 描述 |
---|---|---|---|---|
节点名称 | String | 是 | - | 当前创建的节点名称,由用户自定义且不可为空。命名可包含字母、数字、下划线。 |
选择数据源 | String | 是 | - | 当前输入绑定的数据源名称,从下拉选项中列出的指定的关联类型(Kafka)的数据源进行选择。 |
topic | String | 是 | - | Kafka 主题名称。 |
消费组名称 | String | 是 | - | Kafka 消费组名称 。 |
超时时间(ms) | Integer | 是 | 300000 | 读取超时时间。 |
启用超时终止 | Boolean | 是 | false | 超时终止的开关。如果启用则超时后会终止运行。 |
字段名称 | String | 是 | KafkaMessageName | 要读取的字段名称。 |
键值 | String | 是 | - | 通过键值设置更多的参数。 |
# FAQ
Q: 当使用的kafka需要认证时,DWS模型和DI Server要怎么配置?
A:不同的认证方式,使用的配置项不同,具体如下:
选择 SSL+JKS 双向认证的kafka数据源时:
步骤1. IDE中,kafka输入和kafka输出组件会自动增加以下配置,其中
ssl.keystore.location
和ssl.keystore.location
需要修改为步骤2中jks文件的路径:security.protocol = SSL ssl.keystore.location = ${DI_HOME}/ssl/{datasourceCode}/{envType}/kafka-keystore.jks ssl.keystore.password = keystore-password ssl.key.password = key-password ssl.truststore.location = ${DI_HOME}/ssl/{datasourceCode}/{envType}/kafka-truststore.jks ssl.truststore.password = truststore-password ssl.client.auth = required ssl.endpoint.identification.algorithm = #设置值为一个空格 ssl.client.auth = required
参数说明:
security.protocol
:设置为SSL
,表示使用 SSL/TLS 协议。ssl.truststore.location
:信任库(TrustStore)文件的路径(JKS 或 PEM 格式)。ssl.truststore.location
:信任库(TrustStore)文件的路径(JKS 或 PEM 格式)。ssl.keystore.location
:密钥库(KeyStore)文件的路径(JKS 或 PEM 格式)。ssl.keystore.password
:密钥库的密码。ssl.key.password
:私钥的密码(如果与密钥库密码不同则设置;如果与密钥库密码相同则不添加此参数)。ssl.client.auth
有以下三种配置选项: none:(默认值):服务器不要求客户端提供证书。仅服务器向客户端提供证书,客户端验证服务器的证书。
requested:服务器请求客户端提供证书,但即使客户端未提供证书,连接仍然可以建立。这是一种宽松的双向认证模式。
required:服务器要求客户端必须提供有效证书。如果客户端未提供证书或证书无效,连接将被拒绝。这是一种严格的双向认证模式。
ssl.endpoint.identification.algorithm
:设置端点验证算法,通常为HTTPS
或留空。这里设置为一个空格
步骤2. DI Server需要在${DI_HOME}/ssl/{datasourceCode}/{envType}/目录下放置kafka-keystore.jks、kafka-truststore.jks认证文件
注意:
DI_HOME 即Primeton DI安装目录
datasourceCode为模型中使用DWS系统中的kafka数据源编码
envType为环境类型,如dev、test、pro
ssl及下层目录不存在时,可手动创建
文件夹层级格式不可修改,每层都必须有
选择SASL_PLAINTEXT 认证,且认证机制为SCRAM-SHA-256或SCRAM-SHA-512的kafka数据源时:
IDE中,kafka输入和kafka输出组件会自动增加以下配置;DI Server不需要其他配置:
security.protocol = SASL_PLAINTEXT sasl.mechanism = SCRAM-SHA-256 或 SCRAM-SHA-512 sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required username="<your_username>" password="<your_password>";
参数说明:
security.protocol
:设置为SASL_PLAINTEXT
,使用 SASL 认证,但数据传输不加密(明文传输)。sasl.mechanism
:指定 SASL 认证机制,SCRAM-SHA-256
:使用 SCRAM-SHA-256 算法认证;SCRAM-SHA-512
:使用 SCRAM-SHA-512 算法认证。sasl.jaas.config
:配置 SASL 认证的用户名和密码。选择kerberos 认证方式的kafka数据源时:
- IDE中,kafka输入和kafka输出组件会自动增加以下配置:
security.protocol = SASL_PLAINTEXT sasl.mechanism = GSSAPI
- DI Server需要在 ${DI Server}安装目录/diserver/kerberos 目录下放置krb5.conf、krb5.keytab、jaas.conf和config.properties文件。
jaas.conf
文件内容:KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true useTicketCache=false serviceName="kafka" keyTab="/path/to/krb5.keytab" #krb5.keytab文件存放路径 principal="kafka/server@KAFKA.COM"; #kafka票据信息 };
config.properties
文件内容:username.client.kerberos.principal=kafka/server@KAFKA.COM #kafka票据信息
krb5.conf
和krb5.keytab
文件从kafka服务器获取。以上文件修改后,需要重启DI Server服务。