Compaction Topic
使用方式
打开namesrv上支持顺序消息的开关
CompactionTopic依赖顺序消息来保障一致性
$ bin/mqadmin updateNamesrvConfig -k orderMessageEnable -v true
创建compaction topic
$ bin/mqadmin updateTopic -w 8 -r 8 -a +cleanup.policy=COMPACTION -n localhost:9876 -t ctopic -o true -c DefaultCluster
create topic to 127.0.0.1:10911 success.
TopicConfig [topicName=ctopic, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={+cleanup.policy=COMPACTION}]
生产数据
与普通消息一样
DefaultMQProducer producer = new DefaultMQProducer("CompactionTestGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
String topic = "ctopic";
String tag = "tag1";
String key = "key1";
Message msg = new Message(topic, tag, key, "bodys".getBytes(StandardCharsets.UTF_8));
SendResult sendResult = producer.send(msg, (mqs, message, shardingKey) -> {
int select = Math.abs(shardingKey.hashCode());
if (select < 0) {
select = 0;
}
return mqs.get(select % mqs.size());
}, key);
System.out.printf("%s%n", sendResult);
```
### 消费数据
消费offset与compaction之前保持不变,如果指定offset消费,当指定的offset不存在时,返回后面最近的一条数据
在compaction场景下,大部分消费都是从0开始消费完整的数据
```java
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("compactionTestGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.setPullThreadNums(4);
consumer.start();
Collection<MessageQueue> messageQueueList = consumer.fetchMessageQueues("ctopic");
consumer.assign(messageQueueList);
messageQueueList.forEach(mq -> {
try {
consumer.seekToBegin(mq);
} catch (MQClientException e) {
e.printStackTrace();
}
});
Map<String, byte[]> kvStore = Maps.newHashMap();
while (true) {
List<MessageExt> msgList = consumer.poll(1000);
if (CollectionUtils.isNotEmpty(msgList)) {
msgList.forEach(msg -> kvStore.put(msg.getKeys(), msg.getBody()));
}
}
//use the kvStore
作者:admin 创建时间:2023-08-22 15:26
更新时间:2023-08-22 16:22
更新时间:2023-08-22 16:22