๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ

etc.

Message Queue (MQ)

๋ฉ”์‹œํ‚ค ํ๋ž€?

๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•˜๋Š” ๊ณผ์ •์ด ํฌ๊ฒŒ ๋‘ ๊ฐ€์ง€๋กœ ๋‚˜๋‰œ๋‹ค. ์‹ค์‹œ๊ฐ„์œผ๋กœ ์ฒ˜๋ฆฌํ•˜๋Š” ์˜จ๋ผ์ธ ์ž‘์—…, ๊ทธ๋ฆฌ๊ณ  ์ผ๊ด„์ ์œผ๋กœ ๋ชจ์•„์„œ ์ฒ˜๋ฆฌํ•˜๋Š” ๋ฐฐ์น˜ ์ž‘์—…. ๋ฉ”์‹œ์ง€ ํ๋Š” ๋ฐฐ์น˜ ์ž‘์—…์„ ์œ„ํ•ด ์œ ์šฉํ•˜๊ฒŒ ์‚ฌ์šฉ๋œ๋‹ค.

๋ฉ”์‹œ์ง€ ํ๋Š” ๊ธฐ๋ณธ์ ์œผ๋กœ ๋ฉ”์‹œ์ง€๋ฅผ ์ „๋‹ฌํ•˜๊ธฐ ์œ„ํ•œ ์ž„์‹œ ์ €์žฅ์†Œ์ด๋ฉฐ, ๋‹ค์Œ๊ณผ ๊ฐ™์€ ๊ตฌ์„ฑ ์š”์†Œ๋กœ ์ด๋ฃจ์–ด์ ธ ์žˆ๋‹ค.

  1. ์ƒ์‚ฐ์ž (Producer): ๋ฉ”์‹œ์ง€๋ฅผ ์ƒ์„ฑํ•˜๊ณ  ํ์— ๋ณด๋‚ด๋Š” ์—ญํ• 
  2. ๋ฉ”์‹œํ‚ค ํ (Message Queue): ์ƒ์‚ฐ์ž๋กœ๋ถ€ํ„ฐ ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐ›์•„ ์ €์žฅํ•˜๊ณ  ์žˆ๋‹ค๊ฐ€, ์†Œ๋น„์ž๊ฐ€ ์ค€๋น„๋˜๋ฉด ๋ฉ”์‹œ์ง€๋ฅผ ์ „๋‹ฌ
  3. ์†Œ๋น„์ž (Consumer): ํ์—์„œ ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐ›์•„ ์ฒ˜๋ฆฌ

 

 

์ด ๊ตฌ์กฐ์—์„œ ์ค‘๊ฐ„์— ๋ฉ”์‹œ์ง€ ํ๊ฐ€ ๋ผ์–ด์žˆ์Œ์œผ๋กœ ์ƒ๊ธฐ๋Š” ์ด์ ์€

  • ๋น„๋™๊ธฐ: ์ƒ์‚ฐ์ž์™€ ์†Œ๋น„์ž๊ฐ€ ๋ฉ”์‹œ์ง€๋ฅผ ๋™์‹œ์— ์ฒ˜๋ฆฌํ•˜์ง€ ์•Š์•„๋„ ๋จ. ์˜จ๋ผ์ธ ์ž‘์—…๊ณผ๋Š” ๋‹ค๋ฅด๊ฒŒ ๋ฐฐ์น˜ ์ž‘์—…์€ ๋Œ€๊ฐœ์˜ ๊ฒฝ์šฐ ์–‘์ชฝ ํ”„๋กœ์„ธ์Šค๊ฐ€ ๊นจ์–ด์žˆ์ง€ ์•Š์€ ์ƒํƒœ์—์„œ (ํŠนํžˆ ๋ฐค์—) ๋Œ์•„๊ฐ€๊ธฐ ๋•Œ๋ฌธ์— ๋น„๋™๊ธฐ ํ†ต์‹ ์ด ์ค‘์š”ํ•จ.
  • ํƒ„๋ ฅ์„ฑ: ์‹œ์Šคํ…œ ์žฅ์• ๊ฐ€ ๋ฐœ์ƒํ•ด๋„ ๋ฉ”์‹œ์ง€๊ฐ€ ์†์‹ค๋˜์ง€ ์•Š๋Š”๋‹ค. ๋ฉ”์‹œ์ง€๋Š” ๋ฉ”์‹œ์ง€ ํ์— ๋‚จ์•„ ์žˆ๊ธฐ ๋•Œ๋ฌธ์—.
  • ๋‚ฎ์€ ๊ฒฐํ•ฉ๋„: ์ƒ์‚ฐ์ž์™€ ์†Œ๋น„์ž๊ฐ€ ๋…๋ฆฝ์ ์œผ๋กœ ํ–‰๋™ํ•˜๊ฒŒ ๋จ์œผ๋กœ์จ ์„œ๋น„์Šค๋“ค ๊ฐ„ ๊ฒฐํ•ฉ๋„๊ฐ€ ๋‚ฎ์•„์ง.

 


๊ตฌํ˜„ ์˜ˆ์‹œ (IBM MQ)

์•„๋ž˜ ์ฝ”๋“œ๋Š” IBM MQ์— ์—ฐ๊ฒฐํ•˜๊ณ  ํ๋ฅผ ๊ด€๋ฆฌํ•˜๋Š” Java ํด๋ž˜์Šค๋ฅผ ๊ตฌํ˜„ํ•œ ์˜ˆ์‹œ. IBM MQ ์™ธ์—๋„ RabbitMQ, ActiveMQ, Apache Kafka ๋“ฑ ์—ฌ๋Ÿฌ ์œ ์‚ฌํ•œ ๋ฉ”์‹œ์ง€ ํ์ž‰ ์‹œ์Šคํ…œ์ด ์žˆ๋‹ค. 

 

IbmMqImpl.java

๋”๋ณด๊ธฐ
/*
 * IBM MQ general methods including:
 * - client connection
 * - browse, get, put messages
 * - commit, rollback
 * - release
 */

import java.io.*;
import java.security.*;
import java.util.*;

import com.ibm.mq.*;
import com.ibm.mq.constants.CMQC;

public class IbmMqImpl {
	protected static int retryInterval = 60 * 1000;
	protected static int retryMax = 3;

	private MQQueueManager queueManager;
	private String queueManagerName;
	private String ccdt; // client channel definition table

	private Map<String, MQQueue> putQueues = new Hashtable<String, MQQueue>();
	private Map<String, MQQueue> getQueues = new Hashtable<String, MQQueue>();
	private Map<String, MQQueue> browseQueues = new Hashtable<String, MQQueue>();

	// Creates a connection to the named queue manager using a channel definition table.
	public IbmMQImpl(String queueManagerName, String ccdt) throws IbmMqException {
		this.queueManagerName = queueManagerName;
		this.ccdt = ccdt;
		java.net.URL url = IbmMqImpl.class.getClassLoader().getResource(ccdt);
		
		try {
			url.openConnection();
		} catch (Exception e) {
			e.printStackTrace();
			throw new IbmMqException("An error occured while connecting " + ccdt + ".", e);
		}
		
		boolean finish = false;
		int retryCnt = 0;
		
		while(!finish) {
			try {
				queueManager = new MQQueueManager(queueManagerName, url);
				finish = true;
			} catch (MQException e) {
				if (e.reasonCode == CMQC.MQRC_CONNECTION_BROKEN) {
					try {
						retryCnt++;
						if (retryCnt >= retryMax) {
							finish = true;
						}
						Thread.sleep(retryInterval);
					} catch (InterruptedException e1) {
						throw new IbmMqException("An error occured while connecting to " + queueManagerName + ": " + e.reasonCode, e);
					}
				} else {
					throw new IbmMqException("An error occured while connecting to " + queueManagerName + ": " + e.reasonCode, e);
				}
			}
		}
	}

	private MQQueue getBrowseQueue(String queueName) throws MQException {
		MQQueue queue = null;
		queue = (MQQueue) browseQueues.get(queueName);
		if (queue == null) {
			int openOptions = CMQC.MQOO_BROWSE | CMQC.MQOO_INPUT_SHARED | CMQC.MQOO_FAIL_IF_QUIESCING
					| CMQC.MQOO_BIND_NOT_FIXED;
			queue = queueManager.accessQueue(queueName, openOptions);
			browseQueues.put(queueName, queue);
		}
		return queue;
	}

	private MQQueue getGetQueue(String queueName) throws MQException {
		MQQueue queue = null;
		queue = (MQQueue) getQueues.get(queueName);
		if (queue == null) {
			int openOptions = CMQC.MQOO_INPUT_AD_Q_DEF | CMQC.MQOO_INPUT_SHARED | CMQC.MQOO_FAIL_IF_QUIESCING
					| CMQC.MQOO_BIND_NOT_FIXED;
			queue = queueManager.accessQueue(queueName, openOptions);
			getQueues.put(queueName, queue);
		}
		return queue;
	}

	private MQQueue getPutQueue(String queueName) throws MQException {
		MQQueue queue = null;
		queue = (MQQueue) putQueues.get(queueName);
		if (queue == null) {
			int openOptions = CMQC.MQOO_OUTPUT | CMQC.MQOO_FAIL_IF_QUIESCING | CMQC.MQOO_BIND_ON_OPEN;
			queue = queueManager.accessQueue(queueName, openOptions);
			putQueues.put(queueName, queue);
		}
		return queue;
	}

	public byte[] browseMessage(String queueName) throws IbmMqException {
		byte[] msg = null;
		MQQueue browseQueue = null;

		try {
			browseQueue = getBrowseQueue(queueName);
			MQGetMessageOptions gmo = new MQGetMessageOptions();
			gmo.options += CMQC.MQGMO_BROWSE_FIRST;
			gmo.options += CMQC.MQGMO_WAIT;
			gmo.options += CMQC.MQGMO_FAIL_IF_QUIESCING;
			gmo.options += CMQC.MQGMO_COMPLETE_MSG;
			gmo.waitInterval = 3000;

			MQMessage inMsg = new MQMessage();

			browseQueue.get(inMsg, gmo);

			msg = new byte[inMsg.getDataLength()];
			inMsg.readFully(msg);
		} catch (MQException e) {
			throw new IbmMqException("MQException occured while browsing the message.", e);
		} catch (IOEXception e) {
			throw new IbmMqException("IOEXception occured while browsing the message.", e);
		} catch (NumberFormatException e) {
			throw new IbmMqException("NumberFormatException occured while browsing the message.", e);
		}
		return msg;
	}

	public void getMessage(String queueName) throws IbmMqException {
		byte[] msg = null;
		MQQueue getQueue = null;

		try {
			getQueue = getGetQueue(queueName);
			MQGetMessageOptions gmo = new MQGetMessageOptions();
			gmo.options += CMQC.MQGMO_BROWSE_FIRST;
			gmo.options += CMQC.MQGMO_WAIT;
			gmo.options += CMQC.MQGMO_FAIL_IF_QUIESCING;
			gmo.options += CMQC.MQGMO_COMPLETE_MSG;
			gmo.waitInterval = 3000;

			MQMessage inMsg = new MQMessage();

			getQueue.get(inMsg, gmo);

			msg = new byte[inMsg.getDataLength()];
			inMsg.readFully(msg);
		} catch (MQException e) {
			throw new IbmMqException("MQException occured while getting the message.", e);
		} catch (IOEXception e) {
			throw new IbmMqException("IOEXception occured while getting the message.", e);
		}
		return msg;
	}

	public void putMessage(byte[] msg, String queueName) throws IbmMqException {
		MQQueue putQueue = null;
		try {
			putQueue = getPutQueue(queueName);
			MQPutMessageOptions mpmo = new MQPutMessageOptions();
			mpmo.options += CMQC.MQMO_FAIL_IF_QUIESCING;

			MQMessage mqMessage = nQMessage();
			mqMessage.messageFlags = CMQC.MQMF_SEew MGMENTATION_ALLOWED;

			mqMessage.characterSet = 1208;
			mqMessage.write(msg);

			putQueue.put(mqMessage, mpmo);
		} catch (MQException e) {
			throw new IbmMqException("MQException occured while sending the message.", e);
		} catch (IOEXception e) {
			throw new IbmMqException("IOEXception occured while sending the message.", e);
		}
	}

	public void deleteMessage(String queueName) throws IbmMqException {
		MQQueue getQueue = null;
		try {
			getQueue = getGetQueue(queueName);
			
			MQGetMessageOptions gmo - new MQGetMessageOptions();
			gmo.options += CMQC.MQGMO_FAIL_IF_QUIESCING;
			gmo.options += CMQC.MQGMO_COMPLETE_MSG;
			
			MQMessage inMsg = new MQMessage();
			getQueue.get(inMsg, gmo);
		} catch (MQException e) {
			throw new IbmMqException("MQException occured while deleting the message.", e);
		}
	}

	public void commit() throws IbmMqException {
		if (queueManager != null) {
			try {
				queueManager.commit();
			} catch (MQException e) {
				throw new IbmMqException("MQException occured while committing the resource.", e);
			}
		}
	}

	public void rollback() throws IbmMqException {
		if (queueManager != null) {
			try {
				queueManager.backout();
			} catch (MQException e) {
				throw new IbmMqException("MQException occured while rollback the resource.", e);
			}
		}
	}

	private void releaseQueue(Map<String, MQQueue> queues) throws IbmMqException {
		for (Iterator<String> it = queues.ketSet().iterator(); it.hasNext();) {
			String queueName = (String) it.next();
			MQQueue mqQueue = queues.get(queueName);

			if (mqQueue != null) {
				try {
					mqQueue.close();
				} catch (MQException e) {
					mqQueue = null;
					throw new IbmMqException("MQException occured while releasing the resource.", e);
				}
				mqQueue = null;
			}
		}
		queues.clear();
	}

	public void release() throws IbmMqException {
		releaseQueue(putQueues);
		releaseQueue(getQueues);
		releaseQueue(browseQueues);

		if (queueManager != null) {
			try {
				queueManager.close();
				queueManager.disconnect();
			} catch (MQException e) {
				queueManager = null;
				throw new IbmMqException("MQException occured while releasing the resource.", e);
			}
			queueManager = null;
		}
	}
}

 

IbmMqException.java

๋”๋ณด๊ธฐ
public class IbmMqException extends Exception {
	public IbmMqException(String msg, Exception e) {
		super(msg);
	}
}