模块职责
- 负责 topic 管理的类为 TopicConfigManager,提供了TopicConfig 的 CRUD 功能
- TopicConfigManager 继承自 ConfigManager,具有
load
和persist
功能
关键细节
- 内置 topics
- SYS_SELF_TEST
- AUTO_CREATE_TOPIC_KEY 默认 topic 配置的 key,当自动创建 topic 开关打开后,会用这个 topic 的配置作为 topic 的默认配置
- SYS_BENCHMARK
- clusterName
- brokerName
- SYS_OFFSET_MOVED
- SYS_SCHEDULE:用于暂存待发送的延时消息
- Trace Topic
- Replace Topic:
ClusterName + REPLAY_TOPIC_POSTFIX
- 内存数据结构:
ConcurrentHashMap
,key 为 topicName,value 为 TopicConfig
处理流程梳理
createTopicInSendMessageMethod
: 在AbstractSendMessageProcessor#msgCheck
中调用,当根据当前 request 的 topic 属性选择 Config 失败时,会调用此方法graph TD s((start)) --> A[尝试锁住整个 topicConfigTable] A --> B{加锁成功?} B --> |N|C[返回 null ] C --> C1((end)) B --> |Y|D[根据 topic 获取一次 TopicConfig] D --> D1{TopicConfig 已存在?} D1 --> |Y|D2[返回 TopicConfig] D2 --> C1 D1 --> |N|E[根据 defaultTopic 获取 TopicConfig] E --> F{defaultTopicConfig 存在?} F --> |Y|F1{defaluTopic 是 AUTO_CREATE_TOPIC_KEY_TOPIC?} F1 --> |Y|F11{自动创建 topic 开关打开?} F11 --> |Y|F111[将 defaultTopicConfig 权限设为读写] F --> |N|G[topicConfig 创建失败并打印 log] F111 --> F2{producer 有 defaultTopic 的权限?} F2 --> |Y|F21[新建 TopicConfig] F2 --> |N|G F21 --> H[TopicConfig 加入到内存 topicConfigTable] H --> I[topicConfigTable 版本号自增] I --> J[topicConfigTable 持久化到本地文件] J --> D2 G --> C
createTopicInSendMessageBackMethod
:在SendMessageProcessor#asyncCOnsumerSendMessageBack
方法中调用graph TD s((start)) --> A[尝试锁住整个 topicConfigTable] A --> B{加锁成功?} B --> |N|B1[返回 null ] B1 --> B11((end)) B --> |Y|B2[根据 topic 获取 TopicConfig] B2 --> B3{TopicConfig != null ?} B3 --> |Y|B31[返回 TopicConfig] B31 --> B11 B3 --> |N|B4[新建 TopicConfig] B4 --> B5[TopicConfig 加入到内存 topicConfigTable] B5 --> B6[topicConfigTable 版本自增] B6 --> B7[topicConfigTable 持久化到本地文件] B7 --> B31
createTopicOfTranCheckMaxTime
:实现逻辑和createTopicInSendMessageBackMethod
一致,区别是createTopicOfTranCheckMaxTime
中的 topic 是RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC
,是一个内置的事务消息专用 topicupdateTopicConfig
/deleteTopicConfig
修改或删除某个 TopicConfig,基本逻辑相同,分为如下 3 步graph LR A((start)) --> B[修改内存 topicConfigTable] B --> C[数据版本自增] C --> D[变更持久化到本地文件] D --> E((end))
总结
- RMQ 的 TopicConfigManager 由
内存 ConrrentHashMap + 版本号 + 持久化文件
组成 - TopicConfig 每次变更(新增、更新、删除)都会让版本号自增,版本号由
时间戳 + AtomicLong
组成 - TopicConfigManager 支持从文件加载初始配置,逻辑和
ConfigManager
相同