第 7 篇:事务管理 —— 数据一致性的底线
在前面的文章中,我们探讨了输入校验等重要话题。现在,让我们关注另一个关键的横切关注点:事务管理。
事务是保障数据一致性的底线,特别是在涉及多个数据操作的业务场景中。不正确的事务管理可能导致数据不一致、脏读、不可重复读等问题。
本地事务:@Transactional 的正确用法
在 NestJS 中,TypeORM 提供了 @Transaction 和 @Transactional 装饰器来管理事务。让我们看看如何正确使用它们:
typescript
// 错误的做法:手动管理事务
@Injectable()
export class OrderService {
constructor(
@InjectRepository(Order)
private readonly orderRepository: Repository<Order>,
@InjectRepository(OrderItem)
private readonly orderItemRepository: Repository<OrderItem>,
@InjectRepository(Product)
private readonly productRepository: Repository<Product>,
) {}
async createOrder(createOrderDto: CreateOrderDto): Promise<Order> {
// 手动管理事务容易出错
const queryRunner = this.orderRepository.manager.connection.createQueryRunner();
await queryRunner.connect();
await queryRunner.startTransaction();
try {
// 创建订单
const order = new Order();
order.userId = createOrderDto.userId;
order.totalAmount = 0;
const savedOrder = await queryRunner.manager.save(order);
// 创建订单项并更新库存
let totalAmount = 0;
for (const itemDto of createOrderDto.items) {
const product = await queryRunner.manager.findOne(Product, itemDto.productId);
if (!product || product.stock < itemDto.quantity) {
throw new BusinessException(
BusinessErrorCode.BUSINESS_LOGIC_ERROR,
`Insufficient stock for product ${itemDto.productId}`,
);
}
// 更新库存
product.stock -= itemDto.quantity;
await queryRunner.manager.save(product);
// 创建订单项
const orderItem = new OrderItem();
orderItem.orderId = savedOrder.id;
orderItem.productId = itemDto.productId;
orderItem.quantity = itemDto.quantity;
orderItem.price = product.price;
await queryRunner.manager.save(orderItem);
totalAmount += itemDto.quantity * product.price;
}
// 更新订单总金额
savedOrder.totalAmount = totalAmount;
await queryRunner.manager.save(savedOrder);
await queryRunner.commitTransaction();
return savedOrder;
} catch (error) {
await queryRunner.rollbackTransaction();
throw error;
} finally {
await queryRunner.release();
}
}
}
// 正确的做法:使用 @Transactional 装饰器
@Injectable()
export class OrderService {
constructor(
@InjectRepository(Order)
private readonly orderRepository: Repository<Order>,
@InjectRepository(OrderItem)
private readonly orderItemRepository: Repository<OrderItem>,
@InjectRepository(Product)
private readonly productRepository: Repository<Product>,
) {}
@Transactional()
async createOrder(createOrderDto: CreateOrderDto): Promise<Order> {
// 创建订单
const order = this.orderRepository.create({
userId: createOrderDto.userId,
totalAmount: 0,
});
const savedOrder = await this.orderRepository.save(order);
// 创建订单项并更新库存
let totalAmount = 0;
for (const itemDto of createOrderDto.items) {
const product = await this.productRepository.findOne(itemDto.productId);
if (!product || product.stock < itemDto.quantity) {
throw new BusinessException(
BusinessErrorCode.BUSINESS_LOGIC_ERROR,
`Insufficient stock for product ${itemDto.productId}`,
);
}
// 更新库存
product.stock -= itemDto.quantity;
await this.productRepository.save(product);
// 创建订单项
const orderItem = this.orderItemRepository.create({
orderId: savedOrder.id,
productId: itemDto.productId,
quantity: itemDto.quantity,
price: product.price,
});
await this.orderItemRepository.save(orderItem);
totalAmount += itemDto.quantity * product.price;
}
// 更新订单总金额
savedOrder.totalAmount = totalAmount;
await this.orderRepository.save(savedOrder);
return savedOrder;
}
}分布式事务:Saga 模式实战(订单 → 扣库存 → 发券)
在微服务架构中,分布式事务是一个复杂的问题。Saga 模式是一种常用的解决方案:
typescript
// Saga 模式实现
export interface SagaStep<T> {
action: (data: T) => Promise<any>;
compensation: (data: T) => Promise<any>;
}
export class SagaOrchestrator {
private steps: SagaStep<any>[] = [];
addStep<T>(step: SagaStep<T>): SagaOrchestrator {
this.steps.push(step);
return this;
}
async execute<T>(initialData: T): Promise<T> {
const executedSteps: SagaStep<any>[] = [];
let currentData = initialData;
try {
for (const step of this.steps) {
await step.action(currentData);
executedSteps.push(step);
}
return currentData;
} catch (error) {
// 执行补偿操作
for (let i = executedSteps.length - 1; i >= 0; i--) {
try {
await executedSteps[i].compensation(currentData);
} catch (compensationError) {
console.error('Compensation failed:', compensationError);
}
}
throw error;
}
}
}
// 订单服务
@Injectable()
export class OrderSagaService {
constructor(
private readonly orderService: OrderService,
private readonly inventoryService: InventoryService,
private readonly couponService: CouponService,
) {}
async createOrderWithSaga(createOrderDto: CreateOrderDto): Promise<Order> {
const saga = new SagaOrchestrator();
// 步骤1:创建订单
saga.addStep({
action: async (data: { orderDto: CreateOrderDto }) => {
const order = await this.orderService.createOrder(data.orderDto);
return { ...data, orderId: order.id };
},
compensation: async (data: { orderId: number }) => {
if (data.orderId) {
await this.orderService.cancelOrder(data.orderId);
}
},
});
// 步骤2:扣减库存
saga.addStep({
action: async (data: { orderDto: CreateOrderDto; orderId: number }) => {
const inventoryUpdates = data.orderDto.items.map(item => ({
productId: item.productId,
quantity: item.quantity,
}));
await this.inventoryService.reserveInventory(inventoryUpdates);
return data;
},
compensation: async (data: { orderDto: CreateOrderDto }) => {
const inventoryUpdates = data.orderDto.items.map(item => ({
productId: item.productId,
quantity: item.quantity,
}));
await this.inventoryService.releaseInventory(inventoryUpdates);
},
});
// 步骤3:发放优惠券
saga.addStep({
action: async (data: { orderId: number }) => {
await this.couponService.issueCouponForOrder(data.orderId);
return data;
},
compensation: async (data: { orderId: number }) => {
await this.couponService.revokeCouponForOrder(data.orderId);
},
});
const result = await saga.execute({ orderDto: createOrderDto });
return this.orderService.getOrder(result.orderId);
}
}
// 库存服务
@Injectable()
export class InventoryService {
constructor(
@InjectRepository(Product)
private readonly productRepository: Repository<Product>,
) {}
@Transactional()
async reserveInventory(updates: { productId: number; quantity: number }[]): Promise<void> {
for (const update of updates) {
const product = await this.productRepository.findOne(update.productId);
if (!product) {
throw new BusinessException(
BusinessErrorCode.RESOURCE_NOT_FOUND,
`Product not found: ${update.productId}`,
);
}
if (product.stock < update.quantity) {
throw new BusinessException(
BusinessErrorCode.BUSINESS_LOGIC_ERROR,
`Insufficient stock for product ${update.productId}`,
);
}
product.stock -= update.quantity;
await this.productRepository.save(product);
}
}
@Transactional()
async releaseInventory(updates: { productId: number; quantity: number }[]): Promise<void> {
for (const update of updates) {
const product = await this.productRepository.findOne(update.productId);
if (product) {
product.stock += update.quantity;
await this.productRepository.save(product);
}
}
}
}
// 优惠券服务
@Injectable()
export class CouponService {
constructor(
@InjectRepository(Coupon)
private readonly couponRepository: Repository<Coupon>,
) {}
@Transactional()
async issueCouponForOrder(orderId: number): Promise<void> {
const coupon = this.couponRepository.create({
orderId,
code: `COUPON-${orderId}-${Date.now()}`,
amount: 10, // 固定金额优惠券
status: 'issued',
});
await this.couponRepository.save(coupon);
}
@Transactional()
async revokeCouponForOrder(orderId: number): Promise<void> {
await this.couponRepository.update(
{ orderId },
{ status: 'revoked' },
);
}
}警惕:异步回调中的事务丢失
在异步操作中,事务上下文可能会丢失,导致数据不一致:
typescript
// 错误示例:异步回调中事务丢失
@Injectable()
export class OrderService {
constructor(
@InjectRepository(Order)
private readonly orderRepository: Repository<Order>,
) {}
@Transactional()
async processOrder(orderId: number): Promise<void> {
const order = await this.orderRepository.findOne(orderId);
order.status = 'processing';
await this.orderRepository.save(order);
// 异步操作 - 事务上下文丢失
setTimeout(async () => {
// 这里的操作不在事务中!
order.status = 'completed';
await this.orderRepository.save(order); // 这是一个独立的事务
}, 1000);
}
}
// 正确示例:保持事务上下文
@Injectable()
export class OrderService {
constructor(
@InjectRepository(Order)
private readonly orderRepository: Repository<Order>,
) {}
@Transactional()
async processOrder(orderId: number): Promise<void> {
const order = await this.orderRepository.findOne(orderId);
order.status = 'processing';
await this.orderRepository.save(order);
// 在事务内完成所有操作
await this.performAsyncProcessing(order);
order.status = 'completed';
await this.orderRepository.save(order);
}
private async performAsyncProcessing(order: Order): Promise<void> {
// 模拟异步处理
return new Promise(resolve => {
setTimeout(() => {
// 处理逻辑
resolve();
}, 1000);
});
}
}
// 或者使用事件驱动的方式
@Injectable()
export class OrderService {
constructor(
@InjectRepository(Order)
private readonly orderRepository: Repository<Order>,
private readonly eventEmitter: EventEmitter2,
) {}
@Transactional()
async processOrder(orderId: number): Promise<void> {
const order = await this.orderRepository.findOne(orderId);
order.status = 'processing';
await this.orderRepository.save(order);
// 发出事件,在事件处理器中处理后续逻辑
this.eventEmitter.emit('order.processing', { orderId });
}
}
@EventsHandler('order.processing')
export class OrderProcessingHandler {
constructor(
@InjectRepository(Order)
private readonly orderRepository: Repository<Order>,
) {}
@Transactional()
async handleOrderProcessing(event: { orderId: number }): Promise<void> {
// 在新的事务中处理
const order = await this.orderRepository.findOne(event.orderId);
// 执行处理逻辑
order.status = 'completed';
await this.orderRepository.save(order);
}
}创业建议:尽量避免分布式事务,用最终一致性兜底
对于创业团队,建议尽量避免复杂的分布式事务,而是采用最终一致性方案:
typescript
// 最终一致性方案示例
@Entity()
export class OutboxMessage {
@PrimaryGeneratedColumn()
id: number;
@Column()
eventType: string;
@Column({ type: 'json' })
payload: any;
@Column({ default: false })
processed: boolean;
@Column({ type: 'timestamp', default: () => 'CURRENT_TIMESTAMP' })
createdAt: Date;
@Column({ type: 'timestamp', nullable: true })
processedAt: Date;
}
@Injectable()
export class EventPublisher {
constructor(
@InjectRepository(OutboxMessage)
private readonly outboxRepository: Repository<OutboxMessage>,
) {}
@Transactional()
async publishEvent(eventType: string, payload: any): Promise<void> {
// 在业务事务中保存消息到 outbox
const message = this.outboxRepository.create({
eventType,
payload,
});
await this.outboxRepository.save(message);
}
}
@Injectable()
export class OrderService {
constructor(
@InjectRepository(Order)
private readonly orderRepository: Repository<Order>,
private readonly eventPublisher: EventPublisher,
) {}
@Transactional()
async createOrder(createOrderDto: CreateOrderDto): Promise<Order> {
const order = this.orderRepository.create({
userId: createOrderDto.userId,
totalAmount: 0,
});
const savedOrder = await this.orderRepository.save(order);
// 发布订单创建事件
await this.eventPublisher.publishEvent('order.created', {
orderId: savedOrder.id,
userId: savedOrder.userId,
});
return savedOrder;
}
}
// 消息处理器
@Injectable()
export class OutboxProcessor {
constructor(
@InjectRepository(OutboxMessage)
private readonly outboxRepository: Repository<OutboxMessage>,
private readonly eventEmitter: EventEmitter2,
) {}
@Cron('*/5 * * * * *') // 每5秒执行一次
async processOutbox(): Promise<void> {
const messages = await this.outboxRepository.find({
where: { processed: false },
order: { createdAt: 'ASC' },
take: 10, // 每次处理最多10条消息
});
for (const message of messages) {
try {
// 发出事件
this.eventEmitter.emit(message.eventType, message.payload);
// 标记为已处理
message.processed = true;
message.processedAt = new Date();
await this.outboxRepository.save(message);
} catch (error) {
console.error(`Failed to process message ${message.id}:`, error);
// 可以实现重试机制
}
}
}
}
// 库存服务监听订单创建事件
@EventsHandler('order.created')
export class InventoryEventHandler {
constructor(
private readonly inventoryService: InventoryService,
) {}
async handleOrderCreated(event: { orderId: number; userId: number }): Promise<void> {
try {
// 处理库存扣减
await this.inventoryService.reserveInventoryForOrder(event.orderId);
} catch (error) {
// 处理失败,可以发出库存不足事件
this.eventEmitter.emit('order.inventory.insufficient', event);
}
}
}事务超时和隔离级别设置
在实际应用中,还需要考虑事务的超时和隔离级别:
typescript
// 事务配置
export interface TransactionOptions {
isolation?: 'READ_UNCOMMITTED' | 'READ_COMMITTED' | 'REPEATABLE_READ' | 'SERIALIZABLE';
timeout?: number; // 毫秒
}
// 自定义事务装饰器
export function CustomTransactional(options?: TransactionOptions): MethodDecorator {
return function (target: any, propertyKey: string | symbol, descriptor: PropertyDescriptor) {
const originalMethod = descriptor.value;
descriptor.value = async function (...args: any[]) {
const queryRunner = this.dataSource.createQueryRunner();
await queryRunner.connect();
if (options?.isolation) {
await queryRunner.startTransaction(options.isolation);
} else {
await queryRunner.startTransaction();
}
// 设置超时
if (options?.timeout) {
queryRunner.query(`SET LOCAL statement_timeout = ${options.timeout}`);
}
try {
const result = await originalMethod.apply(this, args);
await queryRunner.commitTransaction();
return result;
} catch (error) {
await queryRunner.rollbackTransaction();
throw error;
} finally {
await queryRunner.release();
}
};
};
}
// 使用自定义事务装饰器
@Injectable()
export class OrderService {
@CustomTransactional({
isolation: 'REPEATABLE_READ',
timeout: 30000, // 30秒超时
})
async createOrder(createOrderDto: CreateOrderDto): Promise<Order> {
// 订单创建逻辑
}
}创业团队行动清单
立即行动:
- 为所有涉及多表操作的业务方法添加事务管理
- 避免在事务中执行长时间运行的操作
- 实现基本的事务超时机制
一周内完成:
- 实现 Saga 模式处理复杂业务流程
- 建立事件驱动的最终一致性机制
- 添加事务隔离级别配置
一月内完善:
- 实现分布式事务的监控和告警
- 建立事务死锁检测和处理机制
- 完善事务相关的日志记录和审计
总结
事务管理是保障数据一致性的关键机制,正确的实现需要:
- 正确使用本地事务:合理使用
@Transactional装饰器 - 处理分布式事务:采用 Saga 模式或最终一致性方案
- 避免事务陷阱:注意异步回调中的事务丢失问题
- 配置优化:设置合适的事务超时和隔离级别
在下一篇文章中,我们将探讨缓存策略,这是提升系统性能的重要手段。