本文共 7397 字,大约阅读时间需要 24 分钟。
在上篇文章,阿堂和大家分享了《分布式消息队列中间件系列研究之阿堂教程(基础篇-Local模式)》,后面由于时间关系,就一直没有接着写了。忙里偷闲,昨天晚上在家写了大部分,今天抽点时间阿堂将继续为大家奉献完成《分布式消息队列中间件系列研究之阿堂教程(进阶篇)》。这里阿堂结合发送邮件的一个相对具体的案例,应该说是有一定的代表性的,相对比较深入的剖析开源框架metq分布式消息队列的使用。相信通过阿堂的分享后,大家就基本明白分布式消息队列是怎么回事和大致知道如何使用了。当然,网友们如果想非常深入的学习和使用metq,建议网友们可以直接到metq的官网去学习和了解。
metq使用的大致流程如下所示
public class Productor {private static Log log = LogFactory.getLog(Productor.class);
public static void main(String[] args) throws Exception {//由消息工厂产生消息生产者
MessageProducer producer = MessageSessionFactoryManager.getSessionFactory(true).createProducer(); //设置topic,必须要在server.ini文件中进行配置 final String topic = "email"; //发布topic producer.publish(topic); //发布内容(根据实际业务来组装内容) String line = "网络时空(阿堂)恭喜大家2015年心想事成!"; try{ //模拟发邮件(根据实际业务定义,可以是发邮件,发短信,httpPost提交等均可) EmailRequest request = new EmailRequest("", topic,line); //序列化发送内容 String jsonString = JSON.toJSONString(request, SerializerFeature.WriteClassName); log.info("------------------------------------------------------------------------"); log.info("发布者发送的EmailAddress = "+request.getEmailAddress()); log.info("发布者发送的 Topic = "+request.getEmailTopic()); log.info("发布者发送的邮件的内容= "+request.getContent()); log.info("发布者发送的邮件发送的时间点= "+request.getCreateTime()); log.info("------------------------------------------------------------------------"); //发布订消息,这里老何定义了重写回调SendMessageCallbackImpl实现方法 producer.sendMessage(new Message(topic, jsonString.getBytes()),new SendMessageCallbackImpl(producer,topic,jsonString)); }catch(Exception ex){ log.error(ex); }}
}
------------------------------------
public class SendMessageCallbackImpl implements SendMessageCallback {private static Log log = LogFactory.getLog(SendMessageCallbackImpl.class);
private MessageProducer messageProducer; private String topic; private String content; public SendMessageCallbackImpl( MessageProducer messageProducer, String topic, String content) { super(); this.messageProducer = messageProducer; this.topic = topic; this.content = content; } public void onException(Throwable e) { log.fatal("metaq server exception , error message:" + e.getMessage()); log.info("--------------------------------------------------------------"); //出现异常时写入日志文件,进行补偿机制 logToFile(); log.info("--------------------------------------------------------------"); } public void onMessageSent(SendResult result) { log.info("result = "+result); if (!result.isSuccess()) { log.info("--------------------------------------------------------------"); log.warn("Send " + topic + " message failed,error message:" + result.getErrorMessage()); //没有收到broker服务器的正常应答时写入日志文件,进行补偿机制 logToFile(); } else { log.info("--------------------------------------------------------------"); log.info("Send "+topic+" successfully,sent to " + result.getPartition()+" "+result.getOffset()); log.info("--------------------------------------------------------------"); } } private void logToFile() { //定义日志文件 String fileName = MessageSessionFactoryManager.getMessagedir()+UUID.randomUUID().toString(); File file = new File(fileName); FileWriter fw = null; try { fw = new FileWriter(file); fw.write(topic+"\r\n"); fw.write(content); fw.flush(); } catch (IOException e) { log.error(e); } finally { if(fw != null) { try { fw.close(); } catch (IOException e) { } } } }}
------------------------------------
public class Consumer { private static Log log = LogFactory.getLog(Consumer.class); public static void main(String[] args) throws Exception {final String topic = "email";
final String group = "meta-example"; try{ //复用SessionFacotory(单例模式) 发布者和订阅者共用sessionFactory MessageConsumer consumer = MessageSessionFactoryManager.getSessionFactory(false).createConsumer(new ConsumerConfig(group)); //每次订阅300k的字节流内容,将订阅信息保存到本地 consumer.subscribe(topic, 1024 * 300, new EmailMessageListener()); //completeSubscribe一次性将所有的订阅生效,并处理zk和metaq服务器的所有交互过程 consumer.completeSubscribe(); }catch(Exception ex){ log.error(ex); }}
}
------------------------------------
public class EmailMessageListener implements MessageListener { private static Log log = LogFactory.getLog(EmailMessageListener.class);public void recieveMessages(Message message) throws InterruptedException {
try { log.info("------------------------------------------------------------------------"); log.info("Receive Email message, BrokerId-Partition:" + message.getPartition().getBrokerId() + "," + message.getPartition().getPartition()+", "+message.getTopic()+" ,"+message.getId()); //反序列化接收内容 EmailRequest emailRequest = JSON.parseObject(new String(message.getData()), EmailRequest.class); log.info("------------------------------------------------------------------------"); log.info("订阅者接收到的EmailAddress = "+emailRequest.getEmailAddress()); log.info("订阅者接收到的 Topic = "+emailRequest.getEmailTopic()); log.info("订阅者接收到的邮件的内容= "+emailRequest.getContent()); log.info("订阅者接收到的邮件发送的时间点= "+emailRequest.getCreateTime()); //这里目前网友可以根据当前系统时间 - emailRequest.getCreateTime() 比较,超过多长时间,可以丢弃此次订阅信息,不消费,直接return log.info("------------------------------------------------------------------------"); log.info("订阅者开始订阅消息啦!"); log.info("开始发送邮件啦!"); //发邮件的代码逻辑(过程略),与本文介绍的内容没有太大联系! //执行发送邮件的代码逻辑 log.info("发送邮件成功啦!"); log.info("订阅者结束订阅消息啦!"); log.info("结束发送邮件啦!"); log.info("End Send Email:" + emailRequest.getEmailAddress()); log.info("------------------------------------------------------------------------"); } catch(Exception ex) { log.error("EmailMessageListener exception", ex); } }public Executor getExecutor() {
// TODO Auto-generated method stub return null; }}
----------------------------------
public class MessageSessionFactoryManager { private static Log log = LogFactory.getLog(MessageSessionFactoryManager.class); private static MessageSessionFactory sessionFactory = null; private static String messagedir; public static String getMessagedir() { return messagedir; } private MessageSessionFactoryManager() { } public synchronized static MessageSessionFactory getSessionFactory() { if(sessionFactory == null) { init(true); } return sessionFactory; } public synchronized static MessageSessionFactory getSessionFactory(boolean isProducer) { if(sessionFactory == null) { init(isProducer); } return sessionFactory; } private static void init(boolean isProducer) { try { String confFile = "/metaq.ini"; InputStream in = EmailMessageProducerManager.class .getResourceAsStream(confFile);Properties conf = new Properties();
conf.load(in);String zookeeper = conf.getProperty("zookeeper");
messagedir = conf.getProperty("messagedir"); final MetaClientConfig metaClientConfig = new MetaClientConfig(); final ZKConfig zkConfig = new ZKConfig(); zkConfig.zkConnect = zookeeper; metaClientConfig.setZkConfig(zkConfig); sessionFactory = new MetaMessageSessionFactory(metaClientConfig); //如果是生产者,则传入值为真;如果是消费者,则传入值为假。可以共用同一个sessionFactory if(isProducer) { //补偿机制,这里主要是针对如下两种情况 //发布者在onException中产生了异常,或者 result.isSuccess()返回值不成功,写会入到meaq.ini文件中对应的messagedir目录下 //然后由定时器每间隔30分钟扫描一次,扫到文件后,然后进行补偿机制进行重新由producer重新发布 Timer timer = new Timer(); //在2秒后执行此任务,每次间隔半小时扫描 D:\metaq\mmp\logs 目录下的异常文件,进行补偿机制发布消息 MesaageExceptionHandleTask()实现方法很简单(由于字数限制不贴上去了)timer.schedule(new MesaageExceptionHandleTask(), 2000, 1000*30*60);
} } catch (Exception e) { log.error(e); throw new RuntimeException(e.getCause()); } } public static MessageProducer getMessageProducer() { return getSessionFactory(true).createProducer(); }}
本文转自 www19 51CTO博客,原文链接:http://blog.51cto.com/doujh/1715275,如需转载请自行联系原作者