Author: 郑文峰
一些组件的常用命令。
./kafka-topics.sh --zookeeper $zookeeper_ip:$zookeeper_port --create --topic $topic --replication-factor $replication_factor --partitions $partitions
创建topic
kafka-topics.sh --list --zookeeper $zookeeper_ip:$zookeeper_port
获取所有的topic
参数 | 作用 |
---|---|
--replication-factor | 副本数 |
--partitions | partitions数 |
--create | 创建topic |
--delete | 删除topic |
kafka-console-consumer.sh --group $group --bootstrap-server $kafka_host:$kafka_port --topic $topic
使用消费者组group对topic进行消费
kafka-console-consumer.sh --consumer.config /tmp/client.properties --group $group --bootstrap-server $kafka_host:$kafka_port --topic $topic
以认证的方式进行消费
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
client.properties 配置文件
参数 | 作用 |
---|---|
--group | 消费者组名称 |
--bootstrap-server | kafka broker的地址 |
--topic | 需要消费的topic |
--from-beginning | 从开头开始消费 |
kafka-console-producer.sh --broker-list $host:$port --topic $topic
对topic进行生产数据. 该方式为未认证方式
kafka-console-producer.sh --producer.config /tmp/client.properties --broker-list $host:$port --topic $topic
以认证的方式连接kafka并生产数据.
kafka-acls.sh --authorizer-properties zookeeper.connect=$zookeeper_host:$zookeeper_port --add --allow-principal User:$username --allow-host IP:$ip --topic $topic
添加对topic的acl限制,仅允许username和来源ip访问
kafka-acls.sh --authorizer-properties zookeeper.connect=$zookeeper_host:$zookeeper_port --list
参数 | 作用 |
---|---|
--allow-principal | 允许访问的用户. |
--allow-host | 允许访问的ip |
--add | 添加acl |
--remove | 删除acl |
--list | 查看acl |
--topic | 指定topic进行acl限制 |
kafka-consumer-groups.sh --bootstrap-server $kafka_host:$kafka_port --describe --group $group
可以查看消费者消费topic的情况,包含当前的offset和topic的最新offset等
kafka-consumer-groups.sh--bootstrap-server $kafka_host:$kafka_port --delete --group $group
删除消费者组
kafka-consumer-groups.sh--bootstrap-server $kafka_host:$kafka_port --list
查看所有消费者组
创建kafka_jaas.conf
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="kafka_user"
password="kafka_password";
};
创建client.properties
Client {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="zookeeperUser"
password="zookeeperPassword";
};
创建环境变量,指定kafka_jaas.conf文件的绝对路径
export KAFKA_OPTS="-Djava.security.auth.login.config=/tmp/kafka_jaas.conf"
消费
kafka-console-consumer.sh --consumer.config ./client.properties --bootstrap-server ${kafka_server}:9092 --topic ${topic}
生产
kafka-console-producer.sh --producer.config ./client.properties --broker-list ${kafka_server}:9092 --topic ${topic}
其他命令
kafka-consumer-groups.sh --command-config ./client.properties --bootstrap-server ${kafka_server}:9092 --describe --group ${group_id}
命令 | 作用 |
---|---|
install | 安装helm |
upgrade | 更新helm,其参数基本与install相同 |
参数 | 作用 |
---|---|
--version | helm包版本号 |
-n | namespace |
-f | 参数文件 |
--dry-run | 不会真正执行操作 |
--debug | debug模式 |
helm install helm名称 helm包名称 --version="helm包版本号" -n namespace -f values.yaml
指定namespace使用values.yaml作为参数安装helm
helm install --dry-run --debug helm名称 helm包名称 --version="helm包版本号" -n namespace -f values.yaml
通过添加--dry-run --debug
可以对安装helm进行测试验证其中的语法和配置。
查看租户
./pulsar-admin --admin-url http://${pulsar_url}:${pulsar_admin_port} tenants
创建租户
./pulsar-admin --admin-url http://${pulsar_url}:${pulsar_admin_port} tenants create ${tenant_name}
查看namespace
./pulsar-admin --admin-url http://${pulsar_url}:${pulsar_admin_port} namespaces
创建namespace
./pulsar-admin --admin-url http://${pulsar_url}:${pulsar_admin_port} namespaces create ${tenant_name}/${namespace_name}
创建topic
./pulsar-admin --admin-url http://${pulsar_url}:${pulsar_admin_port} topics create persistent://${tenant_name}/${tenant_name}/${topic_name}
./pulsar-admin topics partitioned-stats persistent://${tenant_name}/${tenant_name}/${topic_name} --per-partition
参数 | 作用 |
---|---|
msgRateIn | 生产的速率 |
msgRateOut | 被消费的速率 |
subscriptions | 该topic的订阅者 |
msgBacklog | 订阅者堆积数 |
partitions | partitions数 |
./pulsar-client pulsar://${pulsar_url}:${pulsar_port} produce -m "${message}" persistent://${tenant_name}/${tenant_name}/${topic_name}
参数 | 作用 |
---|---|
-m | 生产的消息 |
./pulsar-client pulsar://${pulsar_url}:${pulsar_port} consume -s "${consumer_name}" persistent://${tenant_name}/${tenant_name}/${topic_name} -n 0
参数 | 作用 |
---|---|
-s | 消费者名称 |
-n | 永不停止的消费 |