๋ฉ์ํค ํ๋?
๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๋ ๊ณผ์ ์ด ํฌ๊ฒ ๋ ๊ฐ์ง๋ก ๋๋๋ค. ์ค์๊ฐ์ผ๋ก ์ฒ๋ฆฌํ๋ ์จ๋ผ์ธ ์์
, ๊ทธ๋ฆฌ๊ณ ์ผ๊ด์ ์ผ๋ก ๋ชจ์์ ์ฒ๋ฆฌํ๋ ๋ฐฐ์น ์์
. ๋ฉ์์ง ํ๋ ๋ฐฐ์น ์์
์ ์ํด ์ ์ฉํ๊ฒ ์ฌ์ฉ๋๋ค.
๋ฉ์์ง ํ๋ ๊ธฐ๋ณธ์ ์ผ๋ก ๋ฉ์์ง๋ฅผ ์ ๋ฌํ๊ธฐ ์ํ ์์ ์ ์ฅ์์ด๋ฉฐ, ๋ค์๊ณผ ๊ฐ์ ๊ตฌ์ฑ ์์๋ก ์ด๋ฃจ์ด์ ธ ์๋ค.
- ์์ฐ์ (Producer): ๋ฉ์์ง๋ฅผ ์์ฑํ๊ณ ํ์ ๋ณด๋ด๋ ์ญํ
- ๋ฉ์ํค ํ (Message Queue): ์์ฐ์๋ก๋ถํฐ ๋ฉ์์ง๋ฅผ ๋ฐ์ ์ ์ฅํ๊ณ ์๋ค๊ฐ, ์๋น์๊ฐ ์ค๋น๋๋ฉด ๋ฉ์์ง๋ฅผ ์ ๋ฌ
- ์๋น์ (Consumer): ํ์์ ๋ฉ์์ง๋ฅผ ๋ฐ์ ์ฒ๋ฆฌ
์ด ๊ตฌ์กฐ์์ ์ค๊ฐ์ ๋ฉ์์ง ํ๊ฐ ๋ผ์ด์์์ผ๋ก ์๊ธฐ๋ ์ด์ ์
- ๋น๋๊ธฐ: ์์ฐ์์ ์๋น์๊ฐ ๋ฉ์์ง๋ฅผ ๋์์ ์ฒ๋ฆฌํ์ง ์์๋ ๋จ. ์จ๋ผ์ธ ์์ ๊ณผ๋ ๋ค๋ฅด๊ฒ ๋ฐฐ์น ์์ ์ ๋๊ฐ์ ๊ฒฝ์ฐ ์์ชฝ ํ๋ก์ธ์ค๊ฐ ๊นจ์ด์์ง ์์ ์ํ์์ (ํนํ ๋ฐค์) ๋์๊ฐ๊ธฐ ๋๋ฌธ์ ๋น๋๊ธฐ ํต์ ์ด ์ค์ํจ.
- ํ๋ ฅ์ฑ: ์์คํ ์ฅ์ ๊ฐ ๋ฐ์ํด๋ ๋ฉ์์ง๊ฐ ์์ค๋์ง ์๋๋ค. ๋ฉ์์ง๋ ๋ฉ์์ง ํ์ ๋จ์ ์๊ธฐ ๋๋ฌธ์.
- ๋ฎ์ ๊ฒฐํฉ๋: ์์ฐ์์ ์๋น์๊ฐ ๋ ๋ฆฝ์ ์ผ๋ก ํ๋ํ๊ฒ ๋จ์ผ๋ก์จ ์๋น์ค๋ค ๊ฐ ๊ฒฐํฉ๋๊ฐ ๋ฎ์์ง.
๊ตฌํ ์์ (IBM MQ)
์๋ ์ฝ๋๋ IBM MQ์ ์ฐ๊ฒฐํ๊ณ ํ๋ฅผ ๊ด๋ฆฌํ๋ Java ํด๋์ค๋ฅผ ๊ตฌํํ ์์. IBM MQ ์ธ์๋ RabbitMQ, ActiveMQ, Apache Kafka ๋ฑ ์ฌ๋ฌ ์ ์ฌํ ๋ฉ์์ง ํ์ ์์คํ ์ด ์๋ค.
IbmMqImpl.java
๋๋ณด๊ธฐ
/*
* IBM MQ general methods including:
* - client connection
* - browse, get, put messages
* - commit, rollback
* - release
*/
import java.io.*;
import java.security.*;
import java.util.*;
import com.ibm.mq.*;
import com.ibm.mq.constants.CMQC;
public class IbmMqImpl {
protected static int retryInterval = 60 * 1000;
protected static int retryMax = 3;
private MQQueueManager queueManager;
private String queueManagerName;
private String ccdt; // client channel definition table
private Map<String, MQQueue> putQueues = new Hashtable<String, MQQueue>();
private Map<String, MQQueue> getQueues = new Hashtable<String, MQQueue>();
private Map<String, MQQueue> browseQueues = new Hashtable<String, MQQueue>();
// Creates a connection to the named queue manager using a channel definition table.
public IbmMQImpl(String queueManagerName, String ccdt) throws IbmMqException {
this.queueManagerName = queueManagerName;
this.ccdt = ccdt;
java.net.URL url = IbmMqImpl.class.getClassLoader().getResource(ccdt);
try {
url.openConnection();
} catch (Exception e) {
e.printStackTrace();
throw new IbmMqException("An error occured while connecting " + ccdt + ".", e);
}
boolean finish = false;
int retryCnt = 0;
while(!finish) {
try {
queueManager = new MQQueueManager(queueManagerName, url);
finish = true;
} catch (MQException e) {
if (e.reasonCode == CMQC.MQRC_CONNECTION_BROKEN) {
try {
retryCnt++;
if (retryCnt >= retryMax) {
finish = true;
}
Thread.sleep(retryInterval);
} catch (InterruptedException e1) {
throw new IbmMqException("An error occured while connecting to " + queueManagerName + ": " + e.reasonCode, e);
}
} else {
throw new IbmMqException("An error occured while connecting to " + queueManagerName + ": " + e.reasonCode, e);
}
}
}
}
private MQQueue getBrowseQueue(String queueName) throws MQException {
MQQueue queue = null;
queue = (MQQueue) browseQueues.get(queueName);
if (queue == null) {
int openOptions = CMQC.MQOO_BROWSE | CMQC.MQOO_INPUT_SHARED | CMQC.MQOO_FAIL_IF_QUIESCING
| CMQC.MQOO_BIND_NOT_FIXED;
queue = queueManager.accessQueue(queueName, openOptions);
browseQueues.put(queueName, queue);
}
return queue;
}
private MQQueue getGetQueue(String queueName) throws MQException {
MQQueue queue = null;
queue = (MQQueue) getQueues.get(queueName);
if (queue == null) {
int openOptions = CMQC.MQOO_INPUT_AD_Q_DEF | CMQC.MQOO_INPUT_SHARED | CMQC.MQOO_FAIL_IF_QUIESCING
| CMQC.MQOO_BIND_NOT_FIXED;
queue = queueManager.accessQueue(queueName, openOptions);
getQueues.put(queueName, queue);
}
return queue;
}
private MQQueue getPutQueue(String queueName) throws MQException {
MQQueue queue = null;
queue = (MQQueue) putQueues.get(queueName);
if (queue == null) {
int openOptions = CMQC.MQOO_OUTPUT | CMQC.MQOO_FAIL_IF_QUIESCING | CMQC.MQOO_BIND_ON_OPEN;
queue = queueManager.accessQueue(queueName, openOptions);
putQueues.put(queueName, queue);
}
return queue;
}
public byte[] browseMessage(String queueName) throws IbmMqException {
byte[] msg = null;
MQQueue browseQueue = null;
try {
browseQueue = getBrowseQueue(queueName);
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.options += CMQC.MQGMO_BROWSE_FIRST;
gmo.options += CMQC.MQGMO_WAIT;
gmo.options += CMQC.MQGMO_FAIL_IF_QUIESCING;
gmo.options += CMQC.MQGMO_COMPLETE_MSG;
gmo.waitInterval = 3000;
MQMessage inMsg = new MQMessage();
browseQueue.get(inMsg, gmo);
msg = new byte[inMsg.getDataLength()];
inMsg.readFully(msg);
} catch (MQException e) {
throw new IbmMqException("MQException occured while browsing the message.", e);
} catch (IOEXception e) {
throw new IbmMqException("IOEXception occured while browsing the message.", e);
} catch (NumberFormatException e) {
throw new IbmMqException("NumberFormatException occured while browsing the message.", e);
}
return msg;
}
public void getMessage(String queueName) throws IbmMqException {
byte[] msg = null;
MQQueue getQueue = null;
try {
getQueue = getGetQueue(queueName);
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.options += CMQC.MQGMO_BROWSE_FIRST;
gmo.options += CMQC.MQGMO_WAIT;
gmo.options += CMQC.MQGMO_FAIL_IF_QUIESCING;
gmo.options += CMQC.MQGMO_COMPLETE_MSG;
gmo.waitInterval = 3000;
MQMessage inMsg = new MQMessage();
getQueue.get(inMsg, gmo);
msg = new byte[inMsg.getDataLength()];
inMsg.readFully(msg);
} catch (MQException e) {
throw new IbmMqException("MQException occured while getting the message.", e);
} catch (IOEXception e) {
throw new IbmMqException("IOEXception occured while getting the message.", e);
}
return msg;
}
public void putMessage(byte[] msg, String queueName) throws IbmMqException {
MQQueue putQueue = null;
try {
putQueue = getPutQueue(queueName);
MQPutMessageOptions mpmo = new MQPutMessageOptions();
mpmo.options += CMQC.MQMO_FAIL_IF_QUIESCING;
MQMessage mqMessage = nQMessage();
mqMessage.messageFlags = CMQC.MQMF_SEew MGMENTATION_ALLOWED;
mqMessage.characterSet = 1208;
mqMessage.write(msg);
putQueue.put(mqMessage, mpmo);
} catch (MQException e) {
throw new IbmMqException("MQException occured while sending the message.", e);
} catch (IOEXception e) {
throw new IbmMqException("IOEXception occured while sending the message.", e);
}
}
public void deleteMessage(String queueName) throws IbmMqException {
MQQueue getQueue = null;
try {
getQueue = getGetQueue(queueName);
MQGetMessageOptions gmo - new MQGetMessageOptions();
gmo.options += CMQC.MQGMO_FAIL_IF_QUIESCING;
gmo.options += CMQC.MQGMO_COMPLETE_MSG;
MQMessage inMsg = new MQMessage();
getQueue.get(inMsg, gmo);
} catch (MQException e) {
throw new IbmMqException("MQException occured while deleting the message.", e);
}
}
public void commit() throws IbmMqException {
if (queueManager != null) {
try {
queueManager.commit();
} catch (MQException e) {
throw new IbmMqException("MQException occured while committing the resource.", e);
}
}
}
public void rollback() throws IbmMqException {
if (queueManager != null) {
try {
queueManager.backout();
} catch (MQException e) {
throw new IbmMqException("MQException occured while rollback the resource.", e);
}
}
}
private void releaseQueue(Map<String, MQQueue> queues) throws IbmMqException {
for (Iterator<String> it = queues.ketSet().iterator(); it.hasNext();) {
String queueName = (String) it.next();
MQQueue mqQueue = queues.get(queueName);
if (mqQueue != null) {
try {
mqQueue.close();
} catch (MQException e) {
mqQueue = null;
throw new IbmMqException("MQException occured while releasing the resource.", e);
}
mqQueue = null;
}
}
queues.clear();
}
public void release() throws IbmMqException {
releaseQueue(putQueues);
releaseQueue(getQueues);
releaseQueue(browseQueues);
if (queueManager != null) {
try {
queueManager.close();
queueManager.disconnect();
} catch (MQException e) {
queueManager = null;
throw new IbmMqException("MQException occured while releasing the resource.", e);
}
queueManager = null;
}
}
}
IbmMqException.java
๋๋ณด๊ธฐ
public class IbmMqException extends Exception {
public IbmMqException(String msg, Exception e) {
super(msg);
}
}