本文共 8823 字,大约阅读时间需要 29 分钟。
导致的问题:消息到达服务器之前丢失,那么持久化也不能解决此问题,因为消息根本就没有到达 Rabbit 服务器!
关键代码:
channel.txSelect();channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());channel.txCommit();
import cn.ctoedu.rabbitmq.util.ConnectionUtils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import org.junit.Test;import java.io.IOException;public class SendMQ { private static final String QUEUE_NAME = "QUEUE_simple"; @Test public void sendMsg() throws IOException, TimeoutException {/* 获取一个连接 */ Connection connection = ConnectionUtils.getConnection();/* 从连接中创建通道 */ Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String msg = "Hello Simple QUEUE !"; try { channel.txSelect(); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); int result = 1 / 0; channel.txCommit(); } catch (Exception e) { channel.txRollback(); System.out.println("----msg rollabck "); } finally { System.out.println("---------send msg over:" + msg); } channel.close(); connection.close(); }}
import cn.ctoedu.rabbitmq.util.ConnectionUtils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;import java.io.IOException;public class Consumer { private static final String QUEUE_NAME = "QUEUE_simple"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); DefaultConsumer consumer = new DefaultConsumer(channel) { //获取到达的消息 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; //监听队列 channel.basicConsume(QUEUE_NAME, true, consumer); }}
此种模式还是很耗时的,采用这种方式 降低了 Rabbitmq 的消息吞吐量
上面我们介绍了 RabbitMQ 可能会遇到的一个问题,即生成者不知道消息是否真正到达 broker,随后通过 AMQP 协议层面为我们提供了事务机制解决了这个问题,但是采用事务机制实现会降低RabbitMQ 的消息吞吐量,那么有没有更加高效的解决方式呢?答案是采用 Confirm 模式。
生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都会被指派一个唯一的 ID(从 1 开始)
一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了
如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 deliver-tag 域包含了确认消息的序列号
此外 broker 也可以设置 basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。
confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,
如果RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。
** 已经在 transaction 事务模式的 channel 是不能再设置成 confirm 模式的,即这两种模式是不能共存的。**
生产者通过调用 channel 的 confirmSelect 方法将 channel 设置为 confirm 模式
核心代码:
//生产者通过调用channel的confirmSelect方法将channel设置为confirm模式channel.confirmSelect();
编程模式
import cn.ctoedu.rabbitmq.util.ConnectionUtils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import org.junit.Test;import java.io.IOException;public class SendConfirm { private static final String QUEUE_NAME = "QUEUE_simple_confirm"; @Test public void sendMsg() throws IOException, TimeoutException, InterruptedException { /* 获取一个连接 */ Connection connection = ConnectionUtils.getConnection(); /* 从连接中创建通道 */ Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); //生产者通过调用channel的confirmSelect方法将channel设置为confirm模式 channel.confirmSelect(); String msg = "Hello QUEUE !"; channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); if (!channel.waitForConfirms()) { System.out.println("send message failed."); } else { System.out.println(" send messgae ok ..."); } channel.close(); connection.close(); }}
批量 confirm 模式稍微复杂一点,客户端程序需要定期(每隔多少秒)或者定量(达到多少条)或者两则结合起来publish 消息,然后等待服务器端 confirm, 相比普通 confirm 模式,批量极大提升 confirm 效率,但是问题在于一旦出现 confirm 返回 false 或者超时的情况时,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量,并且,当消息经常丢失时,批量 confirm 性能应该是不升反降的。
import cn.ctoedu.rabbitmq.util.ConnectionUtils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import org.junit.Test;import java.io.IOException;public class SendbatchConfirm { private static final String QUEUE_NAME = "QUEUE_simple_confirm"; @Test public void sendMsg() throws IOException, TimeoutException, InterruptedException { /* 获取一个连接 */ Connection connection = ConnectionUtils.getConnection(); /* 从连接中创建通道 */ Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); //生产者通过调用channel的confirmSelect方法将channel设置为confirm模式 channel.confirmSelect(); String msg = "Hello QUEUE !"; for (int i = 0; i < 10; i++) { channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); } if (!channel.waitForConfirms()) { System.out.println("send message failed."); } else { System.out.println(" send messgae ok ..."); } channel.close(); connection.close(); }}
import cn.ctoedu.rabbitmq.util.ConnectionUtils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.ConfirmListener;import com.rabbitmq.client.Connection;import java.io.IOException;import java.util.Collections;import java.util.SortedSet;import java.util.TreeSet;public class SendAync { private static final String QUEUE_NAME = "QUEUE_simple_confirm_aync"; public static void main(String[] args) throws IOException, TimeoutException { /* 获取一个连接 */ Connection connection = ConnectionUtils.getConnection(); /* 从连接中创建通道 */ Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); //生产者通过调用channel的confirmSelect方法将channel设置为confirm模式 channel.confirmSelect(); final SortedSet confirmSet = Collections.synchronizedSortedSet(new TreeSet ()); channel.addConfirmListener(new ConfirmListener() { //每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录。 @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { if (multiple) { System.out.println("--multiple--"); confirmSet.headSet(deliveryTag + 1).clear();//用一个SortedSet, 返回此有序集合中小于end的所有元素。 } else { System.out.println("--multiple false--"); confirmSet.remove(deliveryTag); } } @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple); if (multiple) { confirmSet.headSet(deliveryTag + 1).clear(); } else { confirmSet.remove(deliveryTag); } } }); String msg = "Hello QUEUE !"; while (true) { long nextSeqNo = channel.getNextPublishSeqNo(); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); confirmSet.add(nextSeqNo); } }}
转载地址:http://mhonn.baihongyu.com/