个人编程网站

进销存(JXC)软件开发技术积累与分享

进销存系统多仓库库存同步与调拨管理

多仓库库存管理概述

对于规模较大的商贸企业,通常需要管理多个仓库的库存。多仓库管理的核心挑战在于:各仓库库存的实时同步、跨仓库调拨的流程控制、库存分配策略等问题。

仓库基础数据设计

字段 类型 说明
warehouse_id varchar(32) 仓库主键
warehouse_code varchar(20) 仓库编码
warehouse_name varchar(100) 仓库名称
warehouse_type varchar(20) 仓库类型:main/branch/virtual
is_default tinyint(1) 是否默认仓库
status tinyint 状态:0禁用 1启用

分布式库存设计模式

1. 集中式库存

所有仓库共享一个库存总表,通过仓库ID区分:

-- 集中式库存表
CREATE TABLE `inventory` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `warehouse_id` varchar(32) NOT NULL COMMENT '仓库ID',
  `product_id` varchar(32) NOT NULL COMMENT '商品ID',
  `sku_id` varchar(32) NOT NULL COMMENT 'SKU ID',
  `quantity` int(11) DEFAULT '0' COMMENT '可用库存',
  `locked_quantity` int(11) DEFAULT '0' COMMENT '锁定库存',
  `total_quantity` int(11) DEFAULT '0' COMMENT '总库存',
  `update_time` datetime DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_warehouse_sku` (`warehouse_id`, `sku_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

2. 分布式库存(推荐)

每个仓库独立的库存表,通过分库分表规则分布:

-- 库存分表逻辑(按仓库ID哈希)
// 分表规则:warehouse_inventory_${hash(warehouse_id) % 100}

// 路由服务
class InventoryRouter {
    static getTableName(warehouseId) {
        const hash = this.hashCode(warehouseId);
        const tableIndex = Math.abs(hash % 100);
        return `warehouse_inventory_${tableIndex}`;
    }

    static hashCode(str) {
        let hash = 0;
        for (let i = 0; i < str.length; i++) {
            hash = ((hash << 5) - hash) + str.charCodeAt(i);
            hash = hash & hash;
        }
        return hash;
    }
}

库存同步方案

1. 实时同步(强一致性)

// 库存同步服务 - 基于消息队列
class InventorySyncService {
    constructor(ctx) {
        this.mq = ctx.mq;  // 消息队列
    }

    // 库存变更发布
    async publishInventoryChange(warehouseId, skuId, changeQty, changeType) {
        const message = {
            warehouseId,
            skuId,
            changeQty,
            changeType,  // in/out/adjust/transfer
            bizId: generateBizId(),
            timestamp: Date.now()
        };

        // 发送到库存同步队列
        await this.mq.publish('inventory-sync', message);
    }

    // 监听库存变更并同步
    async handleInventorySync(message) {
        const { warehouseId, skuId, changeQty } = message;

        // 更新本地库存
        await this.updateLocalInventory(warehouseId, skuId, changeQty);

        // 同步到中心库存(如果有中心仓库概念)
        if (warehouseId !== 'CENTER') {
            await this.syncToCenter(warehouseId, skuId, changeQty);
        }
    }

    // 更新本地库存(乐观锁)
    async updateLocalInventory(warehouseId, skuId, changeQty) {
        const tableName = InventoryRouter.getTableName(warehouseId);

        let retry = 3;
        while (retry > 0) {
            try {
                // 读取当前库存
                const inventory = await this.ctx.db.query(
                    `SELECT quantity, version FROM ${tableName}
                     WHERE warehouse_id = ? AND sku_id = ? FOR UPDATE`,
                    [warehouseId, skuId]
                );

                if (!inventory) {
                    throw new Error('库存记录不存在');
                }

                const newQty = inventory.quantity + changeQty;
                if (newQty < 0) {
                    throw new Error('库存不足');
                }

                // 更新库存(版本控制)
                const affected = await this.ctx.db.execute(
                    `UPDATE ${tableName}
                     SET quantity = ?, version = version + 1
                     WHERE warehouse_id = ? AND sku_id = ? AND version = ?`,
                    [newQty, warehouseId, skuId, inventory.version]
                );

                if (affected === 0) {
                    retry--;
                    continue; // 乐观锁冲突,重试
                }

                return true;
            } catch (err) {
                if (retry > 0 && err.message.includes('版本')) {
                    retry--;
                    continue;
                }
                throw err;
            }
        }
    }
}

2. 最终一致性同步(性能优先)

// 基于 Canal 的增量同步
// 配置 Canal 监听 binlog 变化
class CanalInventorySync {
    constructor() {
        this.canalClient = new CanalClient({
            zookeeperAddresses: ['127.0.0.1:2181'],
            filter: 'jxc\\.inventory.*'
        });
    }

    async start() {
        await this.canalClient.connect();
        await this.canalClient.subscribe('jxc.*');

        while (true) {
            const message = await this.canalClient.get();
            if (message.entries.length > 0) {
                await this.processEntries(message.entries);
            }
        }
    }

    async processEntries(entries) {
        for (const entry of entries) {
            if (entry.entryType === 'ROWDATA') {
                const rowChange = RowChange.parseFrom(entry);
                for (const rowData of rowChange.rowDatas) {
                    await this.syncInventory(rowData);
                }
            }
        }
    }

    async syncInventory(rowData) {
        const afterColumns = rowData.afterColumns;
        const warehouseId = this.getColumnValue(afterColumns, 'warehouse_id');
        const skuId = this.getColumnValue(afterColumns, 'sku_id');
        const quantity = this.getColumnValue(afterColumns, 'quantity');

        // 同步到 Redis 缓存
        await this.redis.set(`inventory:${warehouseId}:${skuId}`, quantity, 'EX', 3600);

        // 同步到 Elasticsearch(用于搜索)
        await this.es.index({
            index: 'inventory',
            id: `${warehouseId}_${skuId}`,
            body: { warehouseId, skuId, quantity }
        });
    }
}

跨仓库调拨流程

// 调拨单服务
class TransferService {
    // 创建调拨单
    async createTransfer(fromWarehouse, toWarehouse, items) {
        const transferNo = generateTransferNo();

        // 开启事务
        const tx = await this.ctx.db.transaction();

        try {
            // 1. 锁定调出仓库库存
            for (const item of items) {
                const locked = await this.lockInventory(
                    fromWarehouse, item.skuId, item.quantity, tx
                );
                if (!locked) {
                    throw new Error(`商品 ${item.skuId} 库存不足`);
                }
            }

            // 2. 创建调拨单(待发货状态)
            await this.ctx.model.Transfer.create([{
                transferNo,
                fromWarehouse,
                toWarehouse,
                items,
                status: 'pending',
                createTime: new Date()
            }], { session: tx });

            await tx.commit();
            return transferNo;
        } catch (err) {
            await tx.abort();
            throw err;
        }
    }

    // 调出仓库发货
    async shipTransfer(transferId) {
        const transfer = await this.ctx.model.Transfer.findById(transferId);

        const tx = await this.ctx.db.transaction();
        try {
            // 扣减调出仓库库存
            for (const item of transfer.items) {
                await this.ctx.db.execute(
                    `UPDATE inventory SET quantity = quantity - ?,
                     locked_quantity = locked_quantity - ?,
                     total_quantity = total_quantity - ?
                     WHERE warehouse_id = ? AND sku_id = ?`,
                    [item.quantity, item.quantity, item.quantity,
                     transfer.fromWarehouse, item.skuId],
                    { session: tx }
                );

                // 记录出库流水
                await this.ctx.model.InventoryLog.create([{
                    warehouseId: transfer.fromWarehouse,
                    skuId: item.skuId,
                    changeQty: -item.quantity,
                    bizType: 'transfer_out',
                    refNo: transfer.transferNo,
                    createTime: new Date()
                }], { session: tx });
            }

            // 更新调拨单状态
            await this.ctx.model.Transfer.updateOne(
                { _id: transferId },
                { status: 'shipped', shipTime: new Date() },
                { session: tx }
            );

            await tx.commit();
        } catch (err) {
            await tx.abort();
            throw err;
        }
    }

    // 调入仓库入库
    async receiveTransfer(transferId) {
        const transfer = await this.ctx.model.Transfer.findById(transferId);

        const tx = await this.ctx.db.transaction();
        try {
            // 增加调入仓库库存
            for (const item of transfer.items) {
                await this.ctx.db.execute(
                    `INSERT INTO inventory (warehouse_id, sku_id, quantity, total_quantity)
                     VALUES (?, ?, ?, ?)
                     ON DUPLICATE KEY UPDATE
                     quantity = quantity + ?, total_quantity = total_quantity + ?`,
                    [transfer.toWarehouse, item.skuId, item.quantity, item.quantity,
                     item.quantity, item.quantity],
                    { session: tx }
                );

                // 记录入库流水
                await this.ctx.model.InventoryLog.create([{
                    warehouseId: transfer.toWarehouse,
                    skuId: item.skuId,
                    changeQty: item.quantity,
                    bizType: 'transfer_in',
                    refNo: transfer.transferNo,
                    createTime: new Date()
                }], { session: tx });
            }

            await this.ctx.model.Transfer.updateOne(
                { _id: transferId },
                { status: 'completed', receiveTime: new Date() },
                { session: tx }
            );

            await tx.commit();
        } catch (err) {
            await tx.abort();
            throw err;
        }
    }
}

库存预警配置

预警类型 触发条件 通知方式
库存不足 可用库存 < 安全库存 系统消息/短信/邮件
库存积压 库存 > 最大库存 系统消息
效期预警 效期 < N 天 系统消息/邮件

总结

多仓库库存管理需要根据业务规模选择合适的架构方案。中小规模可以使用集中式库存方案,业务规模大、并发高的场景建议采用分布式库存设计。库存同步是核心功能,需要在强一致性和性能之间找到平衡点。

← 下一篇:进销存系统智能补货算法与需求预测