RocketMQ ACL

业务需要在同一网络下部署多套 RocketMQ 供多租户使用,并且需要做 ACL 以防止访问非本租户的 RocketMQ。

现状

当前版本为 4.3.0 ,RocketMQ 在 4.4.0 版本开始支持ACL,并且在 4.5.2 版本对ACL做了一些修复以及改进,因此本次需升级RocketMQ 到 4.5.2 版本。

  • ISSUE-1078 - Fixed the issue that User can’t use mqadmin command normally if they don’t copy the tool.yml file to related fold and AclEnable flag is closed.
  • ISSUE-1147 - Fixed the issue that broker will report exception if open the aclEnable and enableDLegerCommitLog flag at the same time.
  • ISSUE-1156 - Add new mqadmin API for ACL configuration.
  • ISSUE-1290 - Support matching some acl ip range.

ACL介绍

详细配置请参考官方文档

什么是ACL

ACL是access control list的简称,俗称访问控制列表。访问控制,基本上会涉及到用户、资源、权限、角色等概念,那在RocketMQ中上述会对应哪些对象呢?

  • 用户:用户是访问控制的基础要素,也不难理解,RocketMQ ACL必然也会引入用户的概念,即支持用户名、密码。
  • 资源:资源,需要保护的对象,在RocketMQ中,消息发送涉及的Topic、消息消费涉及的消费组,应该进行保护,故可以抽象成资源。
  • 权限:针对资源,能进行的操作,
  • 角色:RocketMQ中,只定义两种角色:是否是管理员。

基本流程

20230526171338

  1. 先将对应的权限控制属性(包括Topic访问权限、IP白名单和AccessKey和SecretKey签名等)设置在distribution/conf/plain_acl.yml的配置文件中。
  2. Client客户端通过 RPCHook 注入AccessKey和Signature(根据SecretKey 签名得到的字符串);
  3. Broker端对AccessKey所拥有的权限进行解析&校验,校验不过,抛出异常;

权限校验

Broker端对权限的校验逻辑主要分为以下几步:

  1. 检查是否命中全局 IP 白名单;如果是,则认为校验通过;否则走 2;
  2. 检查是否命中用户 IP 白名单;如果是,则认为校验通过;否则走 3;
  3. 校验签名,校验不通过,抛出异常;校验通过,则走 4;
  4. 对用户请求所需的权限 和 用户所拥有的权限进行校验;不通过,抛出异常;

用户所需权限的校验需要注意已下内容:

  1. 特殊的请求例如 UPDATE_AND_CREATE_TOPIC 等,只能由 admin 账户进行操作;
  2. 对于某个资源,如果有显性配置权限,则采用配置的权限;如果没有显性配置权限,则采用默认的权限;

实操

客户端

发送消息,接收消息的时候指定 RPCHook。

AclClientRPCHook

  1. parseRequestContent 方法内部方法将 request 的自定义头部上面的所有字段的 name 和 value 放入到了一个 SortedMap 中,同时将 ACCESS_KEY 和 SECURITY_TOKEN (如果有)也放入了进去
  2. 将上述所有字段的值拼接城字符串,然后获取字节数组,再与请求本身的 body 的字节数组拼接在一起,获取到最终的 byte[] 数组。
  3. 然后通过 calSignature 方法计算签名,在内部默认采用 SigningAlgorithm.HmacSHA1 算法获取到签名后的 byte[] 数组,再通过 Base64.encodeBase64 将其转为字符串,返回最终的签名。
  4. 生成签名以后,将签名、ACCESS_KEY、SECURITY_TOKEN (如果有) 添加到请求的扩展字段中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class AclClientRPCHook implements RPCHook {
@Override
public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
// Add AccessKey and SecurityToken into signature calculating.
request.addExtField(ACCESS_KEY, sessionCredentials.getAccessKey());
// The SecurityToken value is unnecessary,user can choose this one.
if (sessionCredentials.getSecurityToken() != null) {
request.addExtField(SECURITY_TOKEN, sessionCredentials.getSecurityToken());
}
byte[] total = AclUtils.combineRequestContent(request, parseRequestContent(request));
String signature = AclUtils.calSignature(total, sessionCredentials.getSecretKey());
request.addExtField(SIGNATURE, signature);
}

@Override
public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {

}

...
}

Producer 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class Producer {
public static final int MESSAGE_COUNT = 1000;
public static final String PRODUCER_GROUP = "please_rename_unique_group_name";
public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
public static final String TOPIC = "TopicTest";
public static final String TAG = "TagA";

public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP,getAclRPCHook());
producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
producer.start();

for (int i = 0; i < MESSAGE_COUNT; i++) {
try {
Message msg = new Message(TOPIC, TAG , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}

public static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("rocketmq","12345678"));
}
}

Consumer 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Consumer {

public static final String CONSUMER_GROUP = "please_rename_unique_group_name_4";
public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
public static final String TOPIC = "TopicTest";

public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(null, CONSUMER_GROUP, getAclRPCHook());
consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe(TOPIC, "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.printf("Consumer Started.%n");
}

public static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("rocketmq","12345678"));
}
}

服务端

配置ACL

  1. 升级RocketMQ服务端至4.5.2
  2. broker.conf 开启ACL 增加 aclEnable=true
  3. 配置权限、添加账户、slave节点ip添加到全局白名单
  • 如果ACL与高可用部署(Master/Slave架构)同时启用,那么需要在Broker Master节点的distribution/conf/plain_acl.yml配置文件中 设置全局白名单信息,即为将Slave节点的ip地址设置至Master节点plain_acl.yml配置文件的全局白名单中。

  • 如果ACL与高可用部署(多副本Dledger架构)同时启用,由于出现节点宕机时,Dledger Group组内会自动选主,那么就需要将Dledger Group组 内所有Broker节点的plain_acl.yml配置文件的白名单设置所有Broker节点的ip地址。

由于只需要做租户隔离,这里配置admin=true,细粒度权限请参考官方文档

  • 在 plain_acl.yml 配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    # 全局ip白名单
    globalWhiteRemoteAddresses:
    - 10.10.103.*
    - 192.168.0.*

    # 账户
    accounts:
    - accessKey: rocketmq
    secretKey: 12345678
    whiteRemoteAddress: 192.168.1.*
    # if it is admin, it could access all resources
    admin: true
  • mqadmin 命令配置

    1
    2
    3
    4
    5
    6
    7
    8
    # 添加/更行账号
    sh mqadmin updateAclConfig -n 192.168.1.2:9876 -b 192.168.12.134:10911 -a RocketMQ -s 12345678 -m true

    # 删除账号
    sh mqadmin deleteAccessConfig -n 192.168.1.2:9876 -c DefaultCluster -a RocketMQ

    # 更新全局白名单
    $ sh mqadmin updateGlobalWhiteAddr -n 192.168.1.2:9876 -b 192.168.12.134:10911 -g 10.10.154.1,10.10.154.2

    20230526182704

启动命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 启动 nameserver
# 挂载启动
nohup sh bin/mqnamesrv &
# 查看日志
tail -f ~/logs/rocketmqlogs/namesrv.log

# 启动 broker
nohup sh bin/mqbroker -c conf/broker.conf -n localhost:9876 &
# 查看日志
tail -f ~/logs/rocketmqlogs/broker.log

# 关闭
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv

# 发送消息
export NAMESRV_ADDR=localhost:9876
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
# 接收消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
# 查看集群状态
sh bin/mqadmin clusterList -n "127.0.0.1:9876"

查看集群状态若权限不对显示如下:

1
2
3
4
5
6
7
8
9
10
11
12
➜  rocketmq sh bin/mqadmin clusterList -n "127.0.0.1:9876"
#Cluster Name #Broker Name #BID #Addr #Version #InTPS(LOAD) #OutTPS(LOAD) #PCWait(ms) #Hour #SPACE
org.apache.rocketmq.client.exception.MQBrokerException: CODE: 1 DESC: org.apache.rocketmq.acl.common.AclException: No acl config for rocketmq222, org.apache.rocketmq.acl.plain.PlainPermissionManager.validate(PlainPermissionManager.java:667) BROKER: 10.94.30.17:10911
For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
at org.apache.rocketmq.client.impl.MQClientAPIImpl.getBrokerRuntimeInfo(MQClientAPIImpl.java:1318)
at org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl.fetchBrokerRuntimeStats(DefaultMQAdminExtImpl.java:293)
at org.apache.rocketmq.tools.admin.DefaultMQAdminExt.fetchBrokerRuntimeStats(DefaultMQAdminExt.java:244)
at org.apache.rocketmq.tools.command.cluster.ClusterListSubCommand.printClusterBaseInfo(ClusterListSubCommand.java:212)
at org.apache.rocketmq.tools.command.cluster.ClusterListSubCommand.execute(ClusterListSubCommand.java:88)
at org.apache.rocketmq.tools.command.MQAdminStartup.main0(MQAdminStartup.java:149)
at org.apache.rocketmq.tools.command.MQAdminStartup.main(MQAdminStartup.java:100)
DefaultCluster broker-a 0 10.94.30.17:10911 0.00(,ms) 0.00(,ms) 0.00 0.0000

参考

  1. https://rocketmq.apache.org/zh/docs/4.x/bestPractice/04access
  2. https://kunzhao.org/docs/rocketmq/rocketmq-acl/

RocketMQ ACL
https://zhengshuoo.github.io/posts/018-rocketmq-acl
作者
zhengshuo
发布于
2023年5月26日
许可协议