当前位置:
首页
文章
移动端
详情

第二章 RocketMQ 基本消息

基本消息发送有三种姿势:同步、异步、单向。

  • 同步:消息发送到 Broker 成功后,返回发送成功结果;这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
  • 异步:消息发送出去后立即返回结果,可以在发送成功的消息回调中,查看消息是否发送成功;异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待 Broker 的响应。
  • 单向:消息发送出去,Broker 不返回结果。这种方式主要用在不特别关心发送结果的场景,例如日志发送。

一、同步发送

在 第一章 RocketMQ 搭建调试环境 中,演示了消息的同步发送。

SendResult sendResult = producer.send(msg)

二、异步发送

改造 org.apache.rocketmq.example.simple.AsyncProducer

public class AsyncProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");
        producer.setNamesrvAddr("127.0.0.1:9876");
        // https://blog.csdn.net/heihaozi/article/details/119145266
        DefaultChannelId.newInstance();

        producer.start();
        // 异步发送失败重试,可能导致消息重复发送,需要保证消息幂等性
        producer.setRetryTimesWhenSendAsyncFailed(0);

        int messageCount = 100;
        final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
        for (int i = 0; i < messageCount; i++) {
            try {
                final int index = i;
                Message msg = new Message("TopicTest",
                    "TagA",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                // 异步发送模式
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                    }

                    @Override
                    public void onException(Throwable e) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d Exception %s %n", index, e);
                        e.printStackTrace();
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        countDownLatch.await(5, TimeUnit.SECONDS);
        producer.shutdown();
    }
}

三、单向发送

改造 org.apache.rocketmq.example.simple.OnewayProducer

            Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            // 单向发送
            producer.sendOneway(msg);

四、消息消费

在 第一章 RocketMQ 搭建调试环境 中,演示了消息的接收。

参考

RocketMQ 基本样例

免责申明:本站发布的内容(图片、视频和文字)以转载和分享为主,文章观点不代表本站立场,如涉及侵权请联系站长邮箱:xbc-online@qq.com进行反馈,一经查实,将立刻删除涉嫌侵权内容。