Манитон Docs
Reference

Ledger Service - Реализация

Детальная реализация Ledger Service

Ledger Service - Реализация

Обзор

Ledger Service — микросервис на NestJS для операционного учета и двойной бухгалтерии (double-entry bookkeeping). Отвечает за управление счетами, операциями, холдами и проводками.

Структура проекта

apps/services/ledger/src/
├── app/
│   ├── app.controller.ts
│   └── app.service.ts
├── app.module.ts
├── config/
│   └── config.module.ts
├── domains/
│   └── ledger/
│       ├── application/
│       │   └── use-cases/
│       ├── domain/
│       │   ├── repositories/
│       │   ├── services/
│       │   └── entities/
│       └── presentation/
│           └── ledger-grpc.gateway.ts
├── infrastructure/
│   ├── database/
│   ├── kafka/
│   └── logging/
├── main.ts
└── shared/

Модули

AppModule

// app.module.ts
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { DrizzleModule } from '@maniton/nestjs-common/database';
import { KafkaModule } from '@maniton/nestjs-common/kafka';
import { LoggingModule } from '@maniton/nestjs-common/logging';
import { LedgerModule } from './domains/ledger/ledger.module';

@Module({
  imports: [
    ConfigModule.forRoot({
      isGlobal: true,
      envFilePath: ['.env.local', '.env'],
    }),
    DrizzleModule.forRoot({
      url: process.env.DATABASE_URL,
    }),
    KafkaModule.forRoot({
      brokers: process.env.KAFKA_BROKERS?.split(',') || ['localhost:9092'],
      clientId: 'ledger-service',
    }),
    LoggingModule.forRoot({
      level: process.env.LOG_LEVEL || 'info',
    }),
    LedgerModule,
  ],
  controllers: [],
  providers: [],
})
export class AppModule {}

Use Cases

CreateOperationUseCase

Создание операции с проводками.

// domains/ledger/application/use-cases/create-operation.use-case.ts
import { Injectable } from '@nestjs/common';
import { OperationRepository } from '../../domain/repositories/operation.repository';
import { PostingRepository } from '../../domain/repositories/posting.repository';
import { EventPublisher } from '../../domain/services/event-publisher';
import { OperationStatus, OperationType } from '@maniton/contracts/gen/ts/maniton/common/v1/common_pb';

@Injectable()
export class CreateOperationUseCase {
  constructor(
    private readonly operationRepository: OperationRepository,
    private readonly postingRepository: PostingRepository,
    private readonly eventPublisher: EventPublisher,
  ) {}

  async execute(request: {
    context: any;
    operation: {
      type: OperationType;
      status: OperationStatus;
      userId: string;
      idempotencyKey: string;
      externalRef?: string;
    };
    postings: Array<{
      debitSubaccountId: string;
      creditSubaccountId: string;
      amount: { amount: string; currencyCode: string };
    }>;
  }): Promise<any> {
    const { context, operation, postings } = request;

    // 1. Проверка идемпотентности
    const existing = await this.operationRepository.findByIdempotencyKey(
      operation.idempotencyKey
    );

    if (existing) {
      return { operation: existing };
    }

    // 2. Создание операции
    const op = await this.operationRepository.create({
      ...operation,
      createdAt: new Date(),
      updatedAt: new Date(),
    });

    // 3. Создание проводок
    for (const posting of postings) {
      await this.postingRepository.create({
        operationId: op.id,
        debitSubaccountId: posting.debitSubaccountId,
        creditSubaccountId: posting.creditSubaccountId,
        amount: posting.amount,
        createdAt: new Date(),
      });
    }

    // 4. Публикация события
    await this.eventPublisher.publish({
      type: 'OperationCreated',
      payload: op,
    });

    return { operation: op };
  }
}

CreateHoldUseCase

Создание холда (блокировки средств).

// domains/ledger/application/use-cases/create-hold.use-case.ts
import { Injectable } from '@nestjs/common';
import { HoldRepository } from '../../domain/repositories/hold.repository';
import { EventPublisher } from '../../domain/services/event-publisher';

@Injectable()
export class CreateHoldUseCase {
  constructor(
    private readonly holdRepository: HoldRepository,
    private readonly eventPublisher: EventPublisher,
  ) {}

  async execute(request: {
    context: any;
    subaccountId: string;
    amount: { amount: string; currencyCode: string };
    reason: string;
  }): Promise<any> {
    const { context, subaccountId, amount, reason } = request;

    // Проверка баланса
    const balance = await this.holdRepository.getBalance(subaccountId, amount.currencyCode);

    if (balance < Number(amount.amount.value)) {
      throw new Error('Insufficient balance');
    }

    // Создание холда
    const hold = await this.holdRepository.create({
      subaccountId,
      amount,
      reason,
      status: 'ACTIVE',
      createdAt: new Date(),
    });

    // Публикация события
    await this.eventPublisher.publish({
      type: 'HoldCreated',
      payload: hold,
    });

    return { hold };
  }
}

ConsumeHoldUseCase

Потребление холда (списание заблокированных средств).

// domains/ledger/application/use-cases/consume-hold.use-case.ts
import { Injectable } from '@nestjs/common';
import { HoldRepository } from '../../domain/repositories/hold.repository';
import { PostingRepository } from '../../domain/repositories/posting.repository';
import { EventPublisher } from '../../domain/services/event-publisher';

@Injectable()
export class ConsumeHoldUseCase {
  constructor(
    private readonly holdRepository: HoldRepository,
    private readonly postingRepository: PostingRepository,
    private readonly eventPublisher: EventPublisher,
  ) {}

  async execute(holdId: string): Promise<void> {
    const hold = await this.holdRepository.findById(holdId);

    if (!hold) {
      throw new Error('Hold not found');
    }

    if (hold.status !== 'ACTIVE') {
      throw new Error('Hold is not active');
    }

    // Создание проводок для потребления холда
    await this.postingRepository.create({
      operationId: hold.operationId,
      debitSubaccountId: hold.subaccountId,
      creditSubaccountId: `${hold.subaccountId}:hold`,
      amount: hold.amount,
      createdAt: new Date(),
    });

    // Обновление статуса холда
    await this.holdRepository.update(holdId, {
      status: 'CONSUMED',
      consumedAt: new Date(),
    });

    // Публикация события
    await this.eventPublisher.publish({
      type: 'HoldConsumed',
      payload: hold,
    });
  }
}

Connect RPC Gateway

LedgerGrpcGateway

// domains/ledger/presentation/ledger-grpc.gateway.ts
import { ConnectGateway, ConnectRouter } from '@maniton/nestjs-common/connect';
import { LedgerService } from '@maniton/contracts/gen/ts/maniton/ledger/v1/ledger_pb';
import type { LedgerUseCases } from './ledger-grpc.gateway';

export class LedgerGrpcGateway extends ConnectGateway {
  protected readonly options = {
    requestPathPrefix: '/ledger-service',
  };

  protected registerRoutes(router: ConnectRouter): void {
    router.service(LedgerService, {
      getAccount: (req) => this.useCases.getAccount.execute(req),
      listAccounts: (req) => this.useCases.listAccounts.execute(req),
      getSubAccount: (req) => this.useCases.getSubAccount.execute(req),
      createOperation: (req) => this.useCases.createOperation.execute(req),
      getOperation: (req) => this.useCases.getOperation.execute(req),
      listOperations: (req) => this.useCases.listOperations.execute(req),
      createHold: (req) => this.useCases.createHold.execute(req),
      releaseHold: (req) => this.useCases.releaseHold.execute(req),
      consumeHold: (req) => this.useCases.consumeHold.execute(req),
    });
  }
}

Репозитории

OperationRepository

// domains/ledger/domain/repositories/operation.repository.ts
import { Injectable } from '@nestjs/common';
import { DrizzleService } from '@maniton/nestjs-common/database';
import { operations } from '@maniton/contracts/gen/drizzle/schema';
import { eq } from 'drizzle-orm';

@Injectable()
export class OperationRepository {
  constructor(private readonly drizzle: DrizzleService) {}

  async findById(id: string) {
    const [operation] = await this.drizzle.db
      .select()
      .from(operations)
      .where(eq(operations.id, id))
      .limit(1);

    return operation;
  }

  async findByIdempotencyKey(idempotencyKey: string) {
    const [operation] = await this.drizzle.db
      .select()
      .from(operations)
      .where(eq(operations.idempotencyKey, idempotencyKey))
      .limit(1);

    return operation;
  }

  async findByTxHash(txHash: string) {
    const [operation] = await this.drizzle.db
      .select()
      .from(operations)
      .where(eq(operations.onchainTxHash, txHash))
      .limit(1);

    return operation;
  }

  async create(data: typeof operations.$inferInsert) {
    const [operation] = await this.drizzle.db
      .insert(operations)
      .values(data)
      .returning();

    return operation;
  }

  async update(id: string, data: Partial<typeof operations.$inferInsert>) {
    const [operation] = await this.drizzle.db
      .update(operations)
      .set(data)
      .where(eq(operations.id, id))
      .returning();

    return operation;
  }

  async list(pagination?: { limit?: number; offset?: number }) {
    return this.drizzle.db
      .select()
      .from(operations)
      .limit(pagination?.limit)
      .offset(pagination?.offset);
  }
}

PostingRepository

// domains/ledger/domain/repositories/posting.repository.ts
import { Injectable } from '@nestjs/common';
import { DrizzleService } from '@maniton/nestjs-common/database';
import { postings } from '@maniton/contracts/gen/drizzle/schema';
import { eq } from 'drizzle-orm';

@Injectable()
export class PostingRepository {
  constructor(private readonly drizzle: DrizzleService) {}

  async create(data: typeof postings.$inferInsert) {
    const [posting] = await this.drizzle.db
      .insert(postings)
      .values(data)
      .returning();

    return posting;
  }

  async findByOperationId(operationId: string) {
    return this.drizzle.db
      .select()
      .from(postings)
      .where(eq(postings.operationId, operationId));
  }

  async getBalance(subaccountId: string, currencyCode: string): Promise<number> {
    const [result] = await this.drizzle.db
      .select({
        balance: sql<number>`COALESCE(SUM(CASE WHEN ${postings.debitSubaccountId} = ${subaccountId} THEN -${postings.amount} ELSE ${postings.amount} END), 0)`,
      })
      .from(postings)
      .where(eq(postings.currencyCode, currencyCode));

    return result?.balance || 0;
  }
}

HoldRepository

// domains/ledger/domain/repositories/hold.repository.ts
import { Injectable } from '@nestjs/common';
import { DrizzleService } from '@maniton/nestjs-common/database';
import { holds } from '@maniton/contracts/gen/drizzle/schema';
import { eq } from 'drizzle-orm';

@Injectable()
export class HoldRepository {
  constructor(private readonly drizzle: DrizzleService) {}

  async findById(id: string) {
    const [hold] = await this.drizzle.db
      .select()
      .from(holds)
      .where(eq(holds.id, id))
      .limit(1);

    return hold;
  }

  async create(data: typeof holds.$inferInsert) {
    const [hold] = await this.drizzle.db
      .insert(holds)
      .values(data)
      .returning();

    return hold;
  }

  async update(id: string, data: Partial<typeof holds.$inferInsert>) {
    const [hold] = await this.drizzle.db
      .update(holds)
      .set(data)
      .where(eq(holds.id, id))
      .returning();

    return hold;
  }

  async list(pagination?: { limit?: number; offset?: number }) {
    return this.drizzle.db
      .select()
      .from(holds)
      .limit(pagination?.limit)
      .offset(pagination?.offset);
  }

  async getBalance(subaccountId: string, currencyCode: string): Promise<number> {
    const [result] = await this.drizzle.db
      .select({
        balance: sql<number>`COALESCE(SUM(CASE WHEN ${postings.debitSubaccountId} = ${subaccountId} THEN -${postings.amount} ELSE ${postings.amount} END), 0)`,
      })
      .from(postings)
      .where(eq(postings.currencyCode, currencyCode));

    return result?.balance || 0;
  }
}

Сервисы

EventPublisher

// domains/ledger/domain/services/event-publisher.ts
import { Injectable } from '@nestjs/common';
import { ProducerService } from '@maniton/nestjs-common/kafka';
import { create, toBinary } from '@bufbuild/protobuf';
import { CloudEventSchema, EventEnvelopeSchema } from '@maniton/contracts/gen/ts/maniton/events/v1/envelope_pb';
import { anyPack } from '@maniton/protobuf/wkt';

@Injectable()
export class EventPublisher {
  constructor(private readonly producer: ProducerService) {}

  async publish(topic: string, eventType: string, payload: any, context: any) {
    const cloudEvent = create(CloudEventSchema, {
      id: crypto.randomUUID(),
      source: 'ledger-service',
      specVersion: '1.0',
      type: eventType,
      dataContentType: 'application/protobuf',
      data: {
        case: 'protoData',
        value: anyPack(eventType, payload),
      },
    });

    const envelope = create(EventEnvelopeSchema, {
      event: cloudEvent,
      context,
      topic,
      schemaSubject: eventType,
    });

    const bytes = toBinary(EventEnvelopeSchema, envelope);

    await this.producer.send(topic, bytes);
  }
}

Мониторинг

Метрики

// infrastructure/metrics/metrics.service.ts
import { Injectable } from '@nestjs/common';
import { Counter, Histogram, Gauge } from 'prom-client';

@Injectable()
export class MetricsService {
  private readonly operationsTotal = new Counter({
    name: 'ledger_operations_total',
    help: 'Total number of ledger operations',
    labelNames: ['type', 'status'],
  });

  private readonly holdsTotal = new Counter({
    name: 'ledger_holds_total',
    help: 'Total number of holds',
    labelNames: ['status'],
  });

  private readonly operationDuration = new Histogram({
    name: 'ledger_operation_duration_seconds',
    help: 'Time spent on ledger operations',
    labelNames: ['operation'],
    buckets: [0.1, 0.5, 1, 2, 5, 10, 30, 60],
  });

  incrementOperation(type: string, status: string) {
    this.operationsTotal.inc({ type, status });
  }

  incrementHold(status: string) {
    this.holdsTotal.inc({ status });
  }

  recordOperationDuration(operation: string, duration: number) {
    this.operationDuration.observe({ operation }, duration);
  }
}

Логи

// infrastructure/logging/logger.service.ts
import { Injectable } from '@nestjs/common';

@Injectable()
export class LoggerService {
  private readonly context = 'LedgerService';

  info(message: string, meta?: Record<string, any>) {
    console.log(`[${this.context}] [INFO] ${message}`, meta);
  }

  error(message: string, error?: Error, meta?: Record<string, any>) {
    if (error) {
      console.error(`[${this.context}] [ERROR] ${message}`, error, meta);
    } else {
      console.error(`[${this.context}] [ERROR] ${message}`, meta);
    }
  }

  warn(message: string, meta?: Record<string, any>) {
    console.warn(`[${this.context}] [WARN] ${message}`, meta);
  }

  debug(message: string, meta?: Record<string, any>) {
    if (process.env.LOG_LEVEL === 'debug') {
      console.debug(`[${this.context}] [DEBUG] ${message}`, meta);
    }
  }
}

Тестирование

Unit Tests

// domains/ledger/application/use-cases/create-operation.use-case.spec.ts
import { Test, TestingModule } from '@nestjs/testing';
import { CreateOperationUseCase } from './create-operation.use-case';
import { OperationRepository } from '../../domain/repositories/operation.repository';
import { PostingRepository } from '../../domain/repositories/posting.repository';

describe('CreateOperationUseCase', () => {
  let useCase: CreateOperationUseCase;
  let operationRepository: jest.Mocked<OperationRepository>;
  let postingRepository: jest.Mocked<PostingRepository>;

  beforeEach(async () => {
    const module: TestingModule = await Test.createTestingModule({
      providers: [
        CreateOperationUseCase,
        {
          provide: OperationRepository,
          useValue: {
            findByIdempotencyKey: jest.fn(),
            create: jest.fn(),
          },
        },
        {
          provide: PostingRepository,
          useValue: {
            create: jest.fn(),
          },
        },
      ],
    }).compile();

    useCase = module.get(CreateOperationUseCase);
    operationRepository = module.get(OperationRepository);
    postingRepository = module.get(PostingRepository);
  });

  it('should create operation', async () => {
    const request = {
      context: { requestId: '1', correlationId: '1', idempotencyKey: '1' },
      operation: {
        type: 'ISSUANCE',
        status: 'PROCESSING',
        userId: 'user-123',
        idempotencyKey: '1',
      },
      postings: [
        {
          debitSubaccountId: 'user-123:CFA-RUB',
          creditSubaccountId: 'system:reserve:CFA-RUB',
          amount: { amount: '1000', currencyCode: 'RUB' },
        },
      ],
    };

    const result = await useCase.execute(request);

    expect(operationRepository.create).toHaveBeenCalled();
    expect(postingRepository.create).toHaveBeenCalled();
  });
});

Troubleshooting

Проблема: Баланс не обновляется

Решение:

  1. Проверьте, что проводки созданы
  2. Проверьте, что subaccount IDs верные
  3. Проверьте SQL запрос для расчета баланса

Проблема: Холд не создается

Решение:

  1. Проверьте, что баланс достаточен
  2. Проверьте, что subaccount существует
  3. Проверьте логи HoldRepository

Дополнительные ресурсы

On this page