进销存系统多仓库库存同步与调拨管理
多仓库库存管理概述
对于规模较大的商贸企业,通常需要管理多个仓库的库存。多仓库管理的核心挑战在于:各仓库库存的实时同步、跨仓库调拨的流程控制、库存分配策略等问题。
仓库基础数据设计
| 字段 | 类型 | 说明 |
|---|---|---|
| warehouse_id | varchar(32) | 仓库主键 |
| warehouse_code | varchar(20) | 仓库编码 |
| warehouse_name | varchar(100) | 仓库名称 |
| warehouse_type | varchar(20) | 仓库类型:main/branch/virtual |
| is_default | tinyint(1) | 是否默认仓库 |
| status | tinyint | 状态:0禁用 1启用 |
分布式库存设计模式
1. 集中式库存
所有仓库共享一个库存总表,通过仓库ID区分:
-- 集中式库存表
CREATE TABLE `inventory` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`warehouse_id` varchar(32) NOT NULL COMMENT '仓库ID',
`product_id` varchar(32) NOT NULL COMMENT '商品ID',
`sku_id` varchar(32) NOT NULL COMMENT 'SKU ID',
`quantity` int(11) DEFAULT '0' COMMENT '可用库存',
`locked_quantity` int(11) DEFAULT '0' COMMENT '锁定库存',
`total_quantity` int(11) DEFAULT '0' COMMENT '总库存',
`update_time` datetime DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_warehouse_sku` (`warehouse_id`, `sku_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
2. 分布式库存(推荐)
每个仓库独立的库存表,通过分库分表规则分布:
-- 库存分表逻辑(按仓库ID哈希)
// 分表规则:warehouse_inventory_${hash(warehouse_id) % 100}
// 路由服务
class InventoryRouter {
static getTableName(warehouseId) {
const hash = this.hashCode(warehouseId);
const tableIndex = Math.abs(hash % 100);
return `warehouse_inventory_${tableIndex}`;
}
static hashCode(str) {
let hash = 0;
for (let i = 0; i < str.length; i++) {
hash = ((hash << 5) - hash) + str.charCodeAt(i);
hash = hash & hash;
}
return hash;
}
}
库存同步方案
1. 实时同步(强一致性)
// 库存同步服务 - 基于消息队列
class InventorySyncService {
constructor(ctx) {
this.mq = ctx.mq; // 消息队列
}
// 库存变更发布
async publishInventoryChange(warehouseId, skuId, changeQty, changeType) {
const message = {
warehouseId,
skuId,
changeQty,
changeType, // in/out/adjust/transfer
bizId: generateBizId(),
timestamp: Date.now()
};
// 发送到库存同步队列
await this.mq.publish('inventory-sync', message);
}
// 监听库存变更并同步
async handleInventorySync(message) {
const { warehouseId, skuId, changeQty } = message;
// 更新本地库存
await this.updateLocalInventory(warehouseId, skuId, changeQty);
// 同步到中心库存(如果有中心仓库概念)
if (warehouseId !== 'CENTER') {
await this.syncToCenter(warehouseId, skuId, changeQty);
}
}
// 更新本地库存(乐观锁)
async updateLocalInventory(warehouseId, skuId, changeQty) {
const tableName = InventoryRouter.getTableName(warehouseId);
let retry = 3;
while (retry > 0) {
try {
// 读取当前库存
const inventory = await this.ctx.db.query(
`SELECT quantity, version FROM ${tableName}
WHERE warehouse_id = ? AND sku_id = ? FOR UPDATE`,
[warehouseId, skuId]
);
if (!inventory) {
throw new Error('库存记录不存在');
}
const newQty = inventory.quantity + changeQty;
if (newQty < 0) {
throw new Error('库存不足');
}
// 更新库存(版本控制)
const affected = await this.ctx.db.execute(
`UPDATE ${tableName}
SET quantity = ?, version = version + 1
WHERE warehouse_id = ? AND sku_id = ? AND version = ?`,
[newQty, warehouseId, skuId, inventory.version]
);
if (affected === 0) {
retry--;
continue; // 乐观锁冲突,重试
}
return true;
} catch (err) {
if (retry > 0 && err.message.includes('版本')) {
retry--;
continue;
}
throw err;
}
}
}
}
2. 最终一致性同步(性能优先)
// 基于 Canal 的增量同步
// 配置 Canal 监听 binlog 变化
class CanalInventorySync {
constructor() {
this.canalClient = new CanalClient({
zookeeperAddresses: ['127.0.0.1:2181'],
filter: 'jxc\\.inventory.*'
});
}
async start() {
await this.canalClient.connect();
await this.canalClient.subscribe('jxc.*');
while (true) {
const message = await this.canalClient.get();
if (message.entries.length > 0) {
await this.processEntries(message.entries);
}
}
}
async processEntries(entries) {
for (const entry of entries) {
if (entry.entryType === 'ROWDATA') {
const rowChange = RowChange.parseFrom(entry);
for (const rowData of rowChange.rowDatas) {
await this.syncInventory(rowData);
}
}
}
}
async syncInventory(rowData) {
const afterColumns = rowData.afterColumns;
const warehouseId = this.getColumnValue(afterColumns, 'warehouse_id');
const skuId = this.getColumnValue(afterColumns, 'sku_id');
const quantity = this.getColumnValue(afterColumns, 'quantity');
// 同步到 Redis 缓存
await this.redis.set(`inventory:${warehouseId}:${skuId}`, quantity, 'EX', 3600);
// 同步到 Elasticsearch(用于搜索)
await this.es.index({
index: 'inventory',
id: `${warehouseId}_${skuId}`,
body: { warehouseId, skuId, quantity }
});
}
}
跨仓库调拨流程
// 调拨单服务
class TransferService {
// 创建调拨单
async createTransfer(fromWarehouse, toWarehouse, items) {
const transferNo = generateTransferNo();
// 开启事务
const tx = await this.ctx.db.transaction();
try {
// 1. 锁定调出仓库库存
for (const item of items) {
const locked = await this.lockInventory(
fromWarehouse, item.skuId, item.quantity, tx
);
if (!locked) {
throw new Error(`商品 ${item.skuId} 库存不足`);
}
}
// 2. 创建调拨单(待发货状态)
await this.ctx.model.Transfer.create([{
transferNo,
fromWarehouse,
toWarehouse,
items,
status: 'pending',
createTime: new Date()
}], { session: tx });
await tx.commit();
return transferNo;
} catch (err) {
await tx.abort();
throw err;
}
}
// 调出仓库发货
async shipTransfer(transferId) {
const transfer = await this.ctx.model.Transfer.findById(transferId);
const tx = await this.ctx.db.transaction();
try {
// 扣减调出仓库库存
for (const item of transfer.items) {
await this.ctx.db.execute(
`UPDATE inventory SET quantity = quantity - ?,
locked_quantity = locked_quantity - ?,
total_quantity = total_quantity - ?
WHERE warehouse_id = ? AND sku_id = ?`,
[item.quantity, item.quantity, item.quantity,
transfer.fromWarehouse, item.skuId],
{ session: tx }
);
// 记录出库流水
await this.ctx.model.InventoryLog.create([{
warehouseId: transfer.fromWarehouse,
skuId: item.skuId,
changeQty: -item.quantity,
bizType: 'transfer_out',
refNo: transfer.transferNo,
createTime: new Date()
}], { session: tx });
}
// 更新调拨单状态
await this.ctx.model.Transfer.updateOne(
{ _id: transferId },
{ status: 'shipped', shipTime: new Date() },
{ session: tx }
);
await tx.commit();
} catch (err) {
await tx.abort();
throw err;
}
}
// 调入仓库入库
async receiveTransfer(transferId) {
const transfer = await this.ctx.model.Transfer.findById(transferId);
const tx = await this.ctx.db.transaction();
try {
// 增加调入仓库库存
for (const item of transfer.items) {
await this.ctx.db.execute(
`INSERT INTO inventory (warehouse_id, sku_id, quantity, total_quantity)
VALUES (?, ?, ?, ?)
ON DUPLICATE KEY UPDATE
quantity = quantity + ?, total_quantity = total_quantity + ?`,
[transfer.toWarehouse, item.skuId, item.quantity, item.quantity,
item.quantity, item.quantity],
{ session: tx }
);
// 记录入库流水
await this.ctx.model.InventoryLog.create([{
warehouseId: transfer.toWarehouse,
skuId: item.skuId,
changeQty: item.quantity,
bizType: 'transfer_in',
refNo: transfer.transferNo,
createTime: new Date()
}], { session: tx });
}
await this.ctx.model.Transfer.updateOne(
{ _id: transferId },
{ status: 'completed', receiveTime: new Date() },
{ session: tx }
);
await tx.commit();
} catch (err) {
await tx.abort();
throw err;
}
}
}
库存预警配置
| 预警类型 | 触发条件 | 通知方式 |
|---|---|---|
| 库存不足 | 可用库存 < 安全库存 | 系统消息/短信/邮件 |
| 库存积压 | 库存 > 最大库存 | 系统消息 |
| 效期预警 | 效期 < N 天 | 系统消息/邮件 |
总结
多仓库库存管理需要根据业务规模选择合适的架构方案。中小规模可以使用集中式库存方案,业务规模大、并发高的场景建议采用分布式库存设计。库存同步是核心功能,需要在强一致性和性能之间找到平衡点。