public class Send {
public static void main(String[] args) throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); /* * 分发给多个消费者:发布/订阅 模式 * */ //此处声明交换机 参数1 交换机名字 参数2 交换机类型 channel.exchangeDeclare("logs", "fanout"); String con = "声明一个交换机 666"; channel.basicPublish("logs", "", null, con.getBytes()); channel.close(); connection.close(); }}
public class Receive {
public static void main(String[] args){ try{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("logs", "fanout"); String queuename = channel.queueDeclare().getQueue();//获取队列名称 //System.out.println(queuename); /* * 已经声明了交换机 也声明了 队列 * 现在需要将交换机绑定队列,让交换机将信息发送给队列 * */ channel.queueBind(queuename, "logs", ""); Consumer callback = new Consumer() { @Override public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { } @Override public void handleRecoverOk(String consumerTag) { } @Override public void handleDelivery(String arg0, Envelope arg1, BasicProperties arg2, byte[] arg3) throws IOException { System.out.println("内容:"+new String(arg3,"utf-8")); } @Override public void handleConsumeOk(String consumerTag) { } @Override public void handleCancelOk(String consumerTag) { } @Override public void handleCancel(String consumerTag) throws IOException { } }; channel.basicConsume(queuename,true, callback); channel.close(); connection.close(); }catch(Exception e){ throw new RuntimeException(); } }}