Skip to content

第 7 篇:事务管理 —— 数据一致性的底线

在前面的文章中,我们探讨了输入校验等重要话题。现在,让我们关注另一个关键的横切关注点:事务管理。

事务是保障数据一致性的底线,特别是在涉及多个数据操作的业务场景中。不正确的事务管理可能导致数据不一致、脏读、不可重复读等问题。

本地事务:@Transactional 的正确用法

在 NestJS 中,TypeORM 提供了 @Transaction@Transactional 装饰器来管理事务。让我们看看如何正确使用它们:

typescript
// 错误的做法:手动管理事务
@Injectable()
export class OrderService {
  constructor(
    @InjectRepository(Order)
    private readonly orderRepository: Repository<Order>,
    @InjectRepository(OrderItem)
    private readonly orderItemRepository: Repository<OrderItem>,
    @InjectRepository(Product)
    private readonly productRepository: Repository<Product>,
  ) {}

  async createOrder(createOrderDto: CreateOrderDto): Promise<Order> {
    // 手动管理事务容易出错
    const queryRunner = this.orderRepository.manager.connection.createQueryRunner();
    await queryRunner.connect();
    await queryRunner.startTransaction();

    try {
      // 创建订单
      const order = new Order();
      order.userId = createOrderDto.userId;
      order.totalAmount = 0;
      const savedOrder = await queryRunner.manager.save(order);

      // 创建订单项并更新库存
      let totalAmount = 0;
      for (const itemDto of createOrderDto.items) {
        const product = await queryRunner.manager.findOne(Product, itemDto.productId);
        if (!product || product.stock < itemDto.quantity) {
          throw new BusinessException(
            BusinessErrorCode.BUSINESS_LOGIC_ERROR,
            `Insufficient stock for product ${itemDto.productId}`,
          );
        }

        // 更新库存
        product.stock -= itemDto.quantity;
        await queryRunner.manager.save(product);

        // 创建订单项
        const orderItem = new OrderItem();
        orderItem.orderId = savedOrder.id;
        orderItem.productId = itemDto.productId;
        orderItem.quantity = itemDto.quantity;
        orderItem.price = product.price;
        await queryRunner.manager.save(orderItem);

        totalAmount += itemDto.quantity * product.price;
      }

      // 更新订单总金额
      savedOrder.totalAmount = totalAmount;
      await queryRunner.manager.save(savedOrder);

      await queryRunner.commitTransaction();
      return savedOrder;
    } catch (error) {
      await queryRunner.rollbackTransaction();
      throw error;
    } finally {
      await queryRunner.release();
    }
  }
}

// 正确的做法:使用 @Transactional 装饰器
@Injectable()
export class OrderService {
  constructor(
    @InjectRepository(Order)
    private readonly orderRepository: Repository<Order>,
    @InjectRepository(OrderItem)
    private readonly orderItemRepository: Repository<OrderItem>,
    @InjectRepository(Product)
    private readonly productRepository: Repository<Product>,
  ) {}

  @Transactional()
  async createOrder(createOrderDto: CreateOrderDto): Promise<Order> {
    // 创建订单
    const order = this.orderRepository.create({
      userId: createOrderDto.userId,
      totalAmount: 0,
    });
    const savedOrder = await this.orderRepository.save(order);

    // 创建订单项并更新库存
    let totalAmount = 0;
    for (const itemDto of createOrderDto.items) {
      const product = await this.productRepository.findOne(itemDto.productId);
      if (!product || product.stock < itemDto.quantity) {
        throw new BusinessException(
          BusinessErrorCode.BUSINESS_LOGIC_ERROR,
          `Insufficient stock for product ${itemDto.productId}`,
        );
      }

      // 更新库存
      product.stock -= itemDto.quantity;
      await this.productRepository.save(product);

      // 创建订单项
      const orderItem = this.orderItemRepository.create({
        orderId: savedOrder.id,
        productId: itemDto.productId,
        quantity: itemDto.quantity,
        price: product.price,
      });
      await this.orderItemRepository.save(orderItem);

      totalAmount += itemDto.quantity * product.price;
    }

    // 更新订单总金额
    savedOrder.totalAmount = totalAmount;
    await this.orderRepository.save(savedOrder);

    return savedOrder;
  }
}

分布式事务:Saga 模式实战(订单 → 扣库存 → 发券)

在微服务架构中,分布式事务是一个复杂的问题。Saga 模式是一种常用的解决方案:

typescript
// Saga 模式实现
export interface SagaStep<T> {
  action: (data: T) => Promise<any>;
  compensation: (data: T) => Promise<any>;
}

export class SagaOrchestrator {
  private steps: SagaStep<any>[] = [];

  addStep<T>(step: SagaStep<T>): SagaOrchestrator {
    this.steps.push(step);
    return this;
  }

  async execute<T>(initialData: T): Promise<T> {
    const executedSteps: SagaStep<any>[] = [];
    let currentData = initialData;

    try {
      for (const step of this.steps) {
        await step.action(currentData);
        executedSteps.push(step);
      }
      return currentData;
    } catch (error) {
      // 执行补偿操作
      for (let i = executedSteps.length - 1; i >= 0; i--) {
        try {
          await executedSteps[i].compensation(currentData);
        } catch (compensationError) {
          console.error('Compensation failed:', compensationError);
        }
      }
      throw error;
    }
  }
}

// 订单服务
@Injectable()
export class OrderSagaService {
  constructor(
    private readonly orderService: OrderService,
    private readonly inventoryService: InventoryService,
    private readonly couponService: CouponService,
  ) {}

  async createOrderWithSaga(createOrderDto: CreateOrderDto): Promise<Order> {
    const saga = new SagaOrchestrator();

    // 步骤1:创建订单
    saga.addStep({
      action: async (data: { orderDto: CreateOrderDto }) => {
        const order = await this.orderService.createOrder(data.orderDto);
        return { ...data, orderId: order.id };
      },
      compensation: async (data: { orderId: number }) => {
        if (data.orderId) {
          await this.orderService.cancelOrder(data.orderId);
        }
      },
    });

    // 步骤2:扣减库存
    saga.addStep({
      action: async (data: { orderDto: CreateOrderDto; orderId: number }) => {
        const inventoryUpdates = data.orderDto.items.map(item => ({
          productId: item.productId,
          quantity: item.quantity,
        }));
        
        await this.inventoryService.reserveInventory(inventoryUpdates);
        return data;
      },
      compensation: async (data: { orderDto: CreateOrderDto }) => {
        const inventoryUpdates = data.orderDto.items.map(item => ({
          productId: item.productId,
          quantity: item.quantity,
        }));
        
        await this.inventoryService.releaseInventory(inventoryUpdates);
      },
    });

    // 步骤3:发放优惠券
    saga.addStep({
      action: async (data: { orderId: number }) => {
        await this.couponService.issueCouponForOrder(data.orderId);
        return data;
      },
      compensation: async (data: { orderId: number }) => {
        await this.couponService.revokeCouponForOrder(data.orderId);
      },
    });

    const result = await saga.execute({ orderDto: createOrderDto });
    return this.orderService.getOrder(result.orderId);
  }
}

// 库存服务
@Injectable()
export class InventoryService {
  constructor(
    @InjectRepository(Product)
    private readonly productRepository: Repository<Product>,
  ) {}

  @Transactional()
  async reserveInventory(updates: { productId: number; quantity: number }[]): Promise<void> {
    for (const update of updates) {
      const product = await this.productRepository.findOne(update.productId);
      if (!product) {
        throw new BusinessException(
          BusinessErrorCode.RESOURCE_NOT_FOUND,
          `Product not found: ${update.productId}`,
        );
      }

      if (product.stock < update.quantity) {
        throw new BusinessException(
          BusinessErrorCode.BUSINESS_LOGIC_ERROR,
          `Insufficient stock for product ${update.productId}`,
        );
      }

      product.stock -= update.quantity;
      await this.productRepository.save(product);
    }
  }

  @Transactional()
  async releaseInventory(updates: { productId: number; quantity: number }[]): Promise<void> {
    for (const update of updates) {
      const product = await this.productRepository.findOne(update.productId);
      if (product) {
        product.stock += update.quantity;
        await this.productRepository.save(product);
      }
    }
  }
}

// 优惠券服务
@Injectable()
export class CouponService {
  constructor(
    @InjectRepository(Coupon)
    private readonly couponRepository: Repository<Coupon>,
  ) {}

  @Transactional()
  async issueCouponForOrder(orderId: number): Promise<void> {
    const coupon = this.couponRepository.create({
      orderId,
      code: `COUPON-${orderId}-${Date.now()}`,
      amount: 10, // 固定金额优惠券
      status: 'issued',
    });
    
    await this.couponRepository.save(coupon);
  }

  @Transactional()
  async revokeCouponForOrder(orderId: number): Promise<void> {
    await this.couponRepository.update(
      { orderId },
      { status: 'revoked' },
    );
  }
}

警惕:异步回调中的事务丢失

在异步操作中,事务上下文可能会丢失,导致数据不一致:

typescript
// 错误示例:异步回调中事务丢失
@Injectable()
export class OrderService {
  constructor(
    @InjectRepository(Order)
    private readonly orderRepository: Repository<Order>,
  ) {}

  @Transactional()
  async processOrder(orderId: number): Promise<void> {
    const order = await this.orderRepository.findOne(orderId);
    order.status = 'processing';
    await this.orderRepository.save(order);

    // 异步操作 - 事务上下文丢失
    setTimeout(async () => {
      // 这里的操作不在事务中!
      order.status = 'completed';
      await this.orderRepository.save(order); // 这是一个独立的事务
    }, 1000);
  }
}

// 正确示例:保持事务上下文
@Injectable()
export class OrderService {
  constructor(
    @InjectRepository(Order)
    private readonly orderRepository: Repository<Order>,
  ) {}

  @Transactional()
  async processOrder(orderId: number): Promise<void> {
    const order = await this.orderRepository.findOne(orderId);
    order.status = 'processing';
    await this.orderRepository.save(order);

    // 在事务内完成所有操作
    await this.performAsyncProcessing(order);
    
    order.status = 'completed';
    await this.orderRepository.save(order);
  }

  private async performAsyncProcessing(order: Order): Promise<void> {
    // 模拟异步处理
    return new Promise(resolve => {
      setTimeout(() => {
        // 处理逻辑
        resolve();
      }, 1000);
    });
  }
}

// 或者使用事件驱动的方式
@Injectable()
export class OrderService {
  constructor(
    @InjectRepository(Order)
    private readonly orderRepository: Repository<Order>,
    private readonly eventEmitter: EventEmitter2,
  ) {}

  @Transactional()
  async processOrder(orderId: number): Promise<void> {
    const order = await this.orderRepository.findOne(orderId);
    order.status = 'processing';
    await this.orderRepository.save(order);

    // 发出事件,在事件处理器中处理后续逻辑
    this.eventEmitter.emit('order.processing', { orderId });
  }
}

@EventsHandler('order.processing')
export class OrderProcessingHandler {
  constructor(
    @InjectRepository(Order)
    private readonly orderRepository: Repository<Order>,
  ) {}

  @Transactional()
  async handleOrderProcessing(event: { orderId: number }): Promise<void> {
    // 在新的事务中处理
    const order = await this.orderRepository.findOne(event.orderId);
    // 执行处理逻辑
    order.status = 'completed';
    await this.orderRepository.save(order);
  }
}

创业建议:尽量避免分布式事务,用最终一致性兜底

对于创业团队,建议尽量避免复杂的分布式事务,而是采用最终一致性方案:

typescript
// 最终一致性方案示例
@Entity()
export class OutboxMessage {
  @PrimaryGeneratedColumn()
  id: number;

  @Column()
  eventType: string;

  @Column({ type: 'json' })
  payload: any;

  @Column({ default: false })
  processed: boolean;

  @Column({ type: 'timestamp', default: () => 'CURRENT_TIMESTAMP' })
  createdAt: Date;

  @Column({ type: 'timestamp', nullable: true })
  processedAt: Date;
}

@Injectable()
export class EventPublisher {
  constructor(
    @InjectRepository(OutboxMessage)
    private readonly outboxRepository: Repository<OutboxMessage>,
  ) {}

  @Transactional()
  async publishEvent(eventType: string, payload: any): Promise<void> {
    // 在业务事务中保存消息到 outbox
    const message = this.outboxRepository.create({
      eventType,
      payload,
    });
    await this.outboxRepository.save(message);
  }
}

@Injectable()
export class OrderService {
  constructor(
    @InjectRepository(Order)
    private readonly orderRepository: Repository<Order>,
    private readonly eventPublisher: EventPublisher,
  ) {}

  @Transactional()
  async createOrder(createOrderDto: CreateOrderDto): Promise<Order> {
    const order = this.orderRepository.create({
      userId: createOrderDto.userId,
      totalAmount: 0,
    });
    const savedOrder = await this.orderRepository.save(order);

    // 发布订单创建事件
    await this.eventPublisher.publishEvent('order.created', {
      orderId: savedOrder.id,
      userId: savedOrder.userId,
    });

    return savedOrder;
  }
}

// 消息处理器
@Injectable()
export class OutboxProcessor {
  constructor(
    @InjectRepository(OutboxMessage)
    private readonly outboxRepository: Repository<OutboxMessage>,
    private readonly eventEmitter: EventEmitter2,
  ) {}

  @Cron('*/5 * * * * *') // 每5秒执行一次
  async processOutbox(): Promise<void> {
    const messages = await this.outboxRepository.find({
      where: { processed: false },
      order: { createdAt: 'ASC' },
      take: 10, // 每次处理最多10条消息
    });

    for (const message of messages) {
      try {
        // 发出事件
        this.eventEmitter.emit(message.eventType, message.payload);
        
        // 标记为已处理
        message.processed = true;
        message.processedAt = new Date();
        await this.outboxRepository.save(message);
      } catch (error) {
        console.error(`Failed to process message ${message.id}:`, error);
        // 可以实现重试机制
      }
    }
  }
}

// 库存服务监听订单创建事件
@EventsHandler('order.created')
export class InventoryEventHandler {
  constructor(
    private readonly inventoryService: InventoryService,
  ) {}

  async handleOrderCreated(event: { orderId: number; userId: number }): Promise<void> {
    try {
      // 处理库存扣减
      await this.inventoryService.reserveInventoryForOrder(event.orderId);
    } catch (error) {
      // 处理失败,可以发出库存不足事件
      this.eventEmitter.emit('order.inventory.insufficient', event);
    }
  }
}

事务超时和隔离级别设置

在实际应用中,还需要考虑事务的超时和隔离级别:

typescript
// 事务配置
export interface TransactionOptions {
  isolation?: 'READ_UNCOMMITTED' | 'READ_COMMITTED' | 'REPEATABLE_READ' | 'SERIALIZABLE';
  timeout?: number; // 毫秒
}

// 自定义事务装饰器
export function CustomTransactional(options?: TransactionOptions): MethodDecorator {
  return function (target: any, propertyKey: string | symbol, descriptor: PropertyDescriptor) {
    const originalMethod = descriptor.value;

    descriptor.value = async function (...args: any[]) {
      const queryRunner = this.dataSource.createQueryRunner();
      await queryRunner.connect();
      
      if (options?.isolation) {
        await queryRunner.startTransaction(options.isolation);
      } else {
        await queryRunner.startTransaction();
      }

      // 设置超时
      if (options?.timeout) {
        queryRunner.query(`SET LOCAL statement_timeout = ${options.timeout}`);
      }

      try {
        const result = await originalMethod.apply(this, args);
        await queryRunner.commitTransaction();
        return result;
      } catch (error) {
        await queryRunner.rollbackTransaction();
        throw error;
      } finally {
        await queryRunner.release();
      }
    };
  };
}

// 使用自定义事务装饰器
@Injectable()
export class OrderService {
  @CustomTransactional({ 
    isolation: 'REPEATABLE_READ',
    timeout: 30000, // 30秒超时
  })
  async createOrder(createOrderDto: CreateOrderDto): Promise<Order> {
    // 订单创建逻辑
  }
}

创业团队行动清单

  1. 立即行动

    • 为所有涉及多表操作的业务方法添加事务管理
    • 避免在事务中执行长时间运行的操作
    • 实现基本的事务超时机制
  2. 一周内完成

    • 实现 Saga 模式处理复杂业务流程
    • 建立事件驱动的最终一致性机制
    • 添加事务隔离级别配置
  3. 一月内完善

    • 实现分布式事务的监控和告警
    • 建立事务死锁检测和处理机制
    • 完善事务相关的日志记录和审计

总结

事务管理是保障数据一致性的关键机制,正确的实现需要:

  1. 正确使用本地事务:合理使用 @Transactional 装饰器
  2. 处理分布式事务:采用 Saga 模式或最终一致性方案
  3. 避免事务陷阱:注意异步回调中的事务丢失问题
  4. 配置优化:设置合适的事务超时和隔离级别

在下一篇文章中,我们将探讨缓存策略,这是提升系统性能的重要手段。