在 OMS(订单管理系统)中,大量业务需要与 WMS、TMS 以及其他系统进行异步交互。
例如:
如果全部采用同步 HTTP 调用:
因此系统引入 RabbitMQ 作为异步事件总线。
整体架构如下:
OMS业务系统
│
▼
OmsMessageMqProducer
│
▼
RabbitMQ Exchange
│
▼
RabbitMQ Queue
│
▼
OmsMessageListenerTemplate
│
▼
业务Consumer
│
▼
Domain Service
│
▼
WMS/TMS/DB
系统通过 MQ 实现:
系统中所有消息统一通过:
OmsMessageMqProducer
发送。
调用方式:
OmsMessage<OperateFanout> msg = new OmsMessage<>();
msg.setExchangeName("fanout.oms.ordercenter.operate");
msg.setExchangeType(ExchangeTypes.FANOUT);
msg.setData(data);
omsMessageMqProducer.publishMsg(msg);
系统没有直接发送业务对象。
而是统一封装:
OmsMessage<T>
典型结构:
public class OmsMessage<T>{
private String messageId;
private String exchangeName;
private String exchangeType;
private String routingKey;
private Integer retryTimes;
private Integer maxRetryTimes;
private T data;
}
作用:
发送前自动生成:
String messageId = IdUtil.fastSimpleUUID();
保证:
对于 Fanout Exchange:
if (ExchangeTypes.FANOUT.equals(message.getExchangeType())) {
return "";
}
Fanout 模式不依赖 RoutingKey。
广播到所有绑定队列。
发送时支持:
properties.setDelay(delayMillis);
实现:
publishMsg(message, delayMillis);
常用于:
发送成功:
OMS_MSG_SEND_EXITPOINT_SUCCESS
发送失败:
OMS_MSG_SEND_EXITPOINT_FAIL
记录:
方便排查问题。
系统采用:
@RabbitListener
监听消息。
例如:
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(name = QUEUES),
exchange = @Exchange(name = EXCHANGE)
)
)
public void consumer(Channel channel, Message message)
消费统一走:
OmsMessageListenerTemplate.consume(...)
设计模式:
Template
↓
Callback
框架负责:
业务只负责:
MQ消息
↓
consume()
↓
deserialMsg()
↓
onMessage()
↓
成功/失败
每个 Consumer 自己实现:
@Override
public OmsMessage<OperateFanout> deserialMsg(String msg)
例如:
return objectMapper.readValue(
msg,
new TypeReference<OmsMessage<OperateFanout>>() {}
);
实现:
JSON → Java对象
转换。
核心逻辑位于:
onMessage()
例如:
public void onMessage(OmsMessage<OperateFanout> omsMessage)
处理:
审核通过
更新发运信息
换仓
推送仓库
事件。
最终调用:
pushWarehouse(...)
完成 WMS 推送。
MQ线程没有登录用户。
因此需要模拟系统身份:
IdentityDTO userDTO =
IdentityUtils.systemIdentity();
IdentityContextHolder.setCurrentIdentity(userDTO);
作用:
统一补齐。
业务异常:
throw new CommonException(...)
进入:
onCommonException(...)
处理。
系统异常:
onThrowable(...)
处理。
例如:
消费失败
↓
延迟队列
↓
TTL到期
↓
死信交换机
↓
原队列
↓
重新消费
系统动态创建:
temp.delay.queue.xxxx
配置:
x-message-ttl
x-dead-letter-exchange
x-dead-letter-routing-key
实现延迟重试。
retryTimes
maxRetryTimes
超过最大次数:
OMS_MESSAGE_EXITPOINT_RETRY_OVER
不再重试。
消费失败后:
InboundErrorCreateMessageBO
封装错误信息。
发送:
REQUIREMENT_ERROR_SAVE
错误队列。
流程:
消费失败
↓
记录错误信息
↓
发送错误MQ
↓
补偿系统处理
实现:
能力。
消费过程中:
if (TransactionSynchronizationManager.isSynchronizationActive()) {
TransactionAspectSupport
.currentTransactionStatus()
.setRollbackOnly();
}
作用:
保证:
数据库
+
MQ消费
一致性。
通过注解:
@BizId
private String orderNo;
提取关键业务字段。
通过:
@OmsBizIdentity
标识业务域。
例如:
OMS
WMS
TMS
自动记录:
rt
统计接口耗时。
该 RabbitMQ 框架并不仅仅是简单的消息发送与消费。
本质上是一个企业级消息治理平台。
核心能力包括:
通过 Producer Framework 与 Consumer Framework 的统一治理,实现 OMS、WMS、TMS 等系统之间稳定可靠的异步通信能力。
评论