原创

《企业级 RabbitMQ 消息治理实践:运去哪 OMS 自研 MQ 框架解析》


RabbitMQ 在 OMS 系统中的架构设计与实现

一、背景

在 OMS(订单管理系统)中,大量业务需要与 WMS、TMS 以及其他系统进行异步交互。

例如:

  • 入库需求单审核通过
  • 推送 WMS 创建 ASN
  • 换仓
  • 更新发运信息
  • 轨迹订阅
  • 状态回传

如果全部采用同步 HTTP 调用:

  • 系统耦合度高
  • 响应时间长
  • 容易出现级联故障
  • 系统扩展能力差

因此系统引入 RabbitMQ 作为异步事件总线。


二、整体架构

整体架构如下:

OMS业务系统
       │
       ▼
OmsMessageMqProducer
       │
       ▼
RabbitMQ Exchange
       │
       ▼
RabbitMQ Queue
       │
       ▼
OmsMessageListenerTemplate
       │
       ▼
业务Consumer
       │
       ▼
Domain Service
       │
       ▼
WMS/TMS/DB

系统通过 MQ 实现:

  • 系统解耦
  • 削峰填谷
  • 异步处理
  • 最终一致性

三、生产端设计

3.1 统一生产入口

系统中所有消息统一通过:

OmsMessageMqProducer

发送。

调用方式:

OmsMessage<OperateFanout> msg = new OmsMessage<>();
msg.setExchangeName("fanout.oms.ordercenter.operate");
msg.setExchangeType(ExchangeTypes.FANOUT);
msg.setData(data);

omsMessageMqProducer.publishMsg(msg);

3.2 OmsMessage 统一消息结构

系统没有直接发送业务对象。

而是统一封装:

OmsMessage<T>

典型结构:

public class OmsMessage<T>{

    private String messageId;

    private String exchangeName;

    private String exchangeType;

    private String routingKey;

    private Integer retryTimes;

    private Integer maxRetryTimes;

    private T data;
}

作用:

  • 消息统一
  • 支持重试
  • 支持幂等
  • 支持日志追踪

3.3 MessageId 自动生成

发送前自动生成:

String messageId = IdUtil.fastSimpleUUID();

保证:

  • 消息唯一
  • 支持问题排查
  • 支持幂等控制

3.4 Fanout 路由处理

对于 Fanout Exchange:

if (ExchangeTypes.FANOUT.equals(message.getExchangeType())) {
    return "";
}

Fanout 模式不依赖 RoutingKey。

广播到所有绑定队列。


3.5 延迟消息

发送时支持:

properties.setDelay(delayMillis);

实现:

publishMsg(message, delayMillis);

常用于:

  • 延迟补偿
  • 延迟重试
  • 超时检查

3.6 日志埋点

发送成功:

OMS_MSG_SEND_EXITPOINT_SUCCESS

发送失败:

OMS_MSG_SEND_EXITPOINT_FAIL

记录:

  • exchange
  • routingKey
  • messageId
  • bizId

方便排查问题。


四、消费端设计

4.1 MQ消费入口

系统采用:

@RabbitListener

监听消息。

例如:

@RabbitListener(
    bindings = @QueueBinding(
        value = @Queue(name = QUEUES),
        exchange = @Exchange(name = EXCHANGE)
    )
)
public void consumer(Channel channel, Message message)

4.2 Template + Callback 模式

消费统一走:

OmsMessageListenerTemplate.consume(...)

设计模式:

Template
    ↓
Callback

框架负责:

  • 消息解析
  • 日志记录
  • 异常处理
  • 重试机制
  • 事务控制

业务只负责:

  • 业务逻辑实现

4.3 消费生命周期

MQ消息
   ↓
consume()
   ↓
deserialMsg()
   ↓
onMessage()
   ↓
成功/失败

五、消息反序列化

每个 Consumer 自己实现:

@Override
public OmsMessage<OperateFanout> deserialMsg(String msg)

例如:

return objectMapper.readValue(
    msg,
    new TypeReference<OmsMessage<OperateFanout>>() {}
);

实现:

JSON → Java对象

转换。


六、业务处理

核心逻辑位于:

onMessage()

例如:

public void onMessage(OmsMessage<OperateFanout> omsMessage)

处理:

审核通过
更新发运信息
换仓
推送仓库

事件。

最终调用:

pushWarehouse(...)

完成 WMS 推送。


七、ThreadLocal 上下文

MQ线程没有登录用户。

因此需要模拟系统身份:

IdentityDTO userDTO =
    IdentityUtils.systemIdentity();

IdentityContextHolder.setCurrentIdentity(userDTO);

作用:

  • createdBy
  • updatedBy
  • 操作日志
  • 审计信息

统一补齐。


八、异常处理体系

8.1 CommonException

业务异常:

throw new CommonException(...)

进入:

onCommonException(...)

处理。


8.2 Throwable

系统异常:

onThrowable(...)

处理。

例如:

  • NPE
  • 数据库异常
  • 网络异常

九、延迟重试机制

9.1 重试流程

消费失败
   ↓
延迟队列
   ↓
TTL到期
   ↓
死信交换机
   ↓
原队列
   ↓
重新消费

9.2 Fanout重试实现

系统动态创建:

temp.delay.queue.xxxx

配置:

x-message-ttl

x-dead-letter-exchange

x-dead-letter-routing-key

实现延迟重试。


9.3 重试次数控制

retryTimes

maxRetryTimes

超过最大次数:

OMS_MESSAGE_EXITPOINT_RETRY_OVER

不再重试。


十、错误补偿机制

消费失败后:

InboundErrorCreateMessageBO

封装错误信息。

发送:

REQUIREMENT_ERROR_SAVE

错误队列。

流程:

消费失败
    ↓
记录错误信息
    ↓
发送错误MQ
    ↓
补偿系统处理

实现:

  • 自动补偿
  • 人工补偿

能力。


十一、事务控制

消费过程中:

if (TransactionSynchronizationManager.isSynchronizationActive()) {

    TransactionAspectSupport
        .currentTransactionStatus()
        .setRollbackOnly();
}

作用:

  • MQ失败
  • 回滚数据库事务

保证:

数据库
+
MQ消费

一致性。


十二、日志追踪体系

BizId

通过注解:

@BizId
private String orderNo;

提取关键业务字段。


BizIdentity

通过:

@OmsBizIdentity

标识业务域。

例如:

OMS
WMS
TMS

RT统计

自动记录:

rt

统计接口耗时。


十三、MQ框架核心能力

消费治理

  • Template模式
  • Callback模式
  • 异常统一处理

高可靠

  • 延迟重试
  • 错误补偿
  • 最大重试次数控制

可观测

  • BizId
  • BizIdentity
  • RT统计

一致性

  • Spring事务
  • MQ补偿

十四、总结

该 RabbitMQ 框架并不仅仅是简单的消息发送与消费。

本质上是一个企业级消息治理平台。

核心能力包括:

  • 消息标准化
  • 统一生产入口
  • 统一消费入口
  • 延迟重试机制
  • 错误补偿机制
  • 日志链路追踪
  • 事务一致性控制

通过 Producer Framework 与 Consumer Framework 的统一治理,实现 OMS、WMS、TMS 等系统之间稳定可靠的异步通信能力。

https://share.note.youdao.com/yws/api/personal/file/WEBa98030adf1168aba0a4a82eeff7f225a?method=download&shareKey=ce957b6e30baa504a5a771e0be1b0d76

总结
经验
  • 作者:阿杰(联系作者)
  • 发表时间:2026-06-18T09:24:16
  • 版权声明:杰出版
  • 公众号:--无
  • 评论