Reference
Ledger Service - Реализация
Детальная реализация Ledger Service
Ledger Service - Реализация
Обзор
Ledger Service — микросервис на NestJS для операционного учета и двойной бухгалтерии (double-entry bookkeeping). Отвечает за управление счетами, операциями, холдами и проводками.
Структура проекта
apps/services/ledger/src/
├── app/
│ ├── app.controller.ts
│ └── app.service.ts
├── app.module.ts
├── config/
│ └── config.module.ts
├── domains/
│ └── ledger/
│ ├── application/
│ │ └── use-cases/
│ ├── domain/
│ │ ├── repositories/
│ │ ├── services/
│ │ └── entities/
│ └── presentation/
│ └── ledger-grpc.gateway.ts
├── infrastructure/
│ ├── database/
│ ├── kafka/
│ └── logging/
├── main.ts
└── shared/Модули
AppModule
// app.module.ts
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { DrizzleModule } from '@maniton/nestjs-common/database';
import { KafkaModule } from '@maniton/nestjs-common/kafka';
import { LoggingModule } from '@maniton/nestjs-common/logging';
import { LedgerModule } from './domains/ledger/ledger.module';
@Module({
imports: [
ConfigModule.forRoot({
isGlobal: true,
envFilePath: ['.env.local', '.env'],
}),
DrizzleModule.forRoot({
url: process.env.DATABASE_URL,
}),
KafkaModule.forRoot({
brokers: process.env.KAFKA_BROKERS?.split(',') || ['localhost:9092'],
clientId: 'ledger-service',
}),
LoggingModule.forRoot({
level: process.env.LOG_LEVEL || 'info',
}),
LedgerModule,
],
controllers: [],
providers: [],
})
export class AppModule {}Use Cases
CreateOperationUseCase
Создание операции с проводками.
// domains/ledger/application/use-cases/create-operation.use-case.ts
import { Injectable } from '@nestjs/common';
import { OperationRepository } from '../../domain/repositories/operation.repository';
import { PostingRepository } from '../../domain/repositories/posting.repository';
import { EventPublisher } from '../../domain/services/event-publisher';
import { OperationStatus, OperationType } from '@maniton/contracts/gen/ts/maniton/common/v1/common_pb';
@Injectable()
export class CreateOperationUseCase {
constructor(
private readonly operationRepository: OperationRepository,
private readonly postingRepository: PostingRepository,
private readonly eventPublisher: EventPublisher,
) {}
async execute(request: {
context: any;
operation: {
type: OperationType;
status: OperationStatus;
userId: string;
idempotencyKey: string;
externalRef?: string;
};
postings: Array<{
debitSubaccountId: string;
creditSubaccountId: string;
amount: { amount: string; currencyCode: string };
}>;
}): Promise<any> {
const { context, operation, postings } = request;
// 1. Проверка идемпотентности
const existing = await this.operationRepository.findByIdempotencyKey(
operation.idempotencyKey
);
if (existing) {
return { operation: existing };
}
// 2. Создание операции
const op = await this.operationRepository.create({
...operation,
createdAt: new Date(),
updatedAt: new Date(),
});
// 3. Создание проводок
for (const posting of postings) {
await this.postingRepository.create({
operationId: op.id,
debitSubaccountId: posting.debitSubaccountId,
creditSubaccountId: posting.creditSubaccountId,
amount: posting.amount,
createdAt: new Date(),
});
}
// 4. Публикация события
await this.eventPublisher.publish({
type: 'OperationCreated',
payload: op,
});
return { operation: op };
}
}CreateHoldUseCase
Создание холда (блокировки средств).
// domains/ledger/application/use-cases/create-hold.use-case.ts
import { Injectable } from '@nestjs/common';
import { HoldRepository } from '../../domain/repositories/hold.repository';
import { EventPublisher } from '../../domain/services/event-publisher';
@Injectable()
export class CreateHoldUseCase {
constructor(
private readonly holdRepository: HoldRepository,
private readonly eventPublisher: EventPublisher,
) {}
async execute(request: {
context: any;
subaccountId: string;
amount: { amount: string; currencyCode: string };
reason: string;
}): Promise<any> {
const { context, subaccountId, amount, reason } = request;
// Проверка баланса
const balance = await this.holdRepository.getBalance(subaccountId, amount.currencyCode);
if (balance < Number(amount.amount.value)) {
throw new Error('Insufficient balance');
}
// Создание холда
const hold = await this.holdRepository.create({
subaccountId,
amount,
reason,
status: 'ACTIVE',
createdAt: new Date(),
});
// Публикация события
await this.eventPublisher.publish({
type: 'HoldCreated',
payload: hold,
});
return { hold };
}
}ConsumeHoldUseCase
Потребление холда (списание заблокированных средств).
// domains/ledger/application/use-cases/consume-hold.use-case.ts
import { Injectable } from '@nestjs/common';
import { HoldRepository } from '../../domain/repositories/hold.repository';
import { PostingRepository } from '../../domain/repositories/posting.repository';
import { EventPublisher } from '../../domain/services/event-publisher';
@Injectable()
export class ConsumeHoldUseCase {
constructor(
private readonly holdRepository: HoldRepository,
private readonly postingRepository: PostingRepository,
private readonly eventPublisher: EventPublisher,
) {}
async execute(holdId: string): Promise<void> {
const hold = await this.holdRepository.findById(holdId);
if (!hold) {
throw new Error('Hold not found');
}
if (hold.status !== 'ACTIVE') {
throw new Error('Hold is not active');
}
// Создание проводок для потребления холда
await this.postingRepository.create({
operationId: hold.operationId,
debitSubaccountId: hold.subaccountId,
creditSubaccountId: `${hold.subaccountId}:hold`,
amount: hold.amount,
createdAt: new Date(),
});
// Обновление статуса холда
await this.holdRepository.update(holdId, {
status: 'CONSUMED',
consumedAt: new Date(),
});
// Публикация события
await this.eventPublisher.publish({
type: 'HoldConsumed',
payload: hold,
});
}
}Connect RPC Gateway
LedgerGrpcGateway
// domains/ledger/presentation/ledger-grpc.gateway.ts
import { ConnectGateway, ConnectRouter } from '@maniton/nestjs-common/connect';
import { LedgerService } from '@maniton/contracts/gen/ts/maniton/ledger/v1/ledger_pb';
import type { LedgerUseCases } from './ledger-grpc.gateway';
export class LedgerGrpcGateway extends ConnectGateway {
protected readonly options = {
requestPathPrefix: '/ledger-service',
};
protected registerRoutes(router: ConnectRouter): void {
router.service(LedgerService, {
getAccount: (req) => this.useCases.getAccount.execute(req),
listAccounts: (req) => this.useCases.listAccounts.execute(req),
getSubAccount: (req) => this.useCases.getSubAccount.execute(req),
createOperation: (req) => this.useCases.createOperation.execute(req),
getOperation: (req) => this.useCases.getOperation.execute(req),
listOperations: (req) => this.useCases.listOperations.execute(req),
createHold: (req) => this.useCases.createHold.execute(req),
releaseHold: (req) => this.useCases.releaseHold.execute(req),
consumeHold: (req) => this.useCases.consumeHold.execute(req),
});
}
}Репозитории
OperationRepository
// domains/ledger/domain/repositories/operation.repository.ts
import { Injectable } from '@nestjs/common';
import { DrizzleService } from '@maniton/nestjs-common/database';
import { operations } from '@maniton/contracts/gen/drizzle/schema';
import { eq } from 'drizzle-orm';
@Injectable()
export class OperationRepository {
constructor(private readonly drizzle: DrizzleService) {}
async findById(id: string) {
const [operation] = await this.drizzle.db
.select()
.from(operations)
.where(eq(operations.id, id))
.limit(1);
return operation;
}
async findByIdempotencyKey(idempotencyKey: string) {
const [operation] = await this.drizzle.db
.select()
.from(operations)
.where(eq(operations.idempotencyKey, idempotencyKey))
.limit(1);
return operation;
}
async findByTxHash(txHash: string) {
const [operation] = await this.drizzle.db
.select()
.from(operations)
.where(eq(operations.onchainTxHash, txHash))
.limit(1);
return operation;
}
async create(data: typeof operations.$inferInsert) {
const [operation] = await this.drizzle.db
.insert(operations)
.values(data)
.returning();
return operation;
}
async update(id: string, data: Partial<typeof operations.$inferInsert>) {
const [operation] = await this.drizzle.db
.update(operations)
.set(data)
.where(eq(operations.id, id))
.returning();
return operation;
}
async list(pagination?: { limit?: number; offset?: number }) {
return this.drizzle.db
.select()
.from(operations)
.limit(pagination?.limit)
.offset(pagination?.offset);
}
}PostingRepository
// domains/ledger/domain/repositories/posting.repository.ts
import { Injectable } from '@nestjs/common';
import { DrizzleService } from '@maniton/nestjs-common/database';
import { postings } from '@maniton/contracts/gen/drizzle/schema';
import { eq } from 'drizzle-orm';
@Injectable()
export class PostingRepository {
constructor(private readonly drizzle: DrizzleService) {}
async create(data: typeof postings.$inferInsert) {
const [posting] = await this.drizzle.db
.insert(postings)
.values(data)
.returning();
return posting;
}
async findByOperationId(operationId: string) {
return this.drizzle.db
.select()
.from(postings)
.where(eq(postings.operationId, operationId));
}
async getBalance(subaccountId: string, currencyCode: string): Promise<number> {
const [result] = await this.drizzle.db
.select({
balance: sql<number>`COALESCE(SUM(CASE WHEN ${postings.debitSubaccountId} = ${subaccountId} THEN -${postings.amount} ELSE ${postings.amount} END), 0)`,
})
.from(postings)
.where(eq(postings.currencyCode, currencyCode));
return result?.balance || 0;
}
}HoldRepository
// domains/ledger/domain/repositories/hold.repository.ts
import { Injectable } from '@nestjs/common';
import { DrizzleService } from '@maniton/nestjs-common/database';
import { holds } from '@maniton/contracts/gen/drizzle/schema';
import { eq } from 'drizzle-orm';
@Injectable()
export class HoldRepository {
constructor(private readonly drizzle: DrizzleService) {}
async findById(id: string) {
const [hold] = await this.drizzle.db
.select()
.from(holds)
.where(eq(holds.id, id))
.limit(1);
return hold;
}
async create(data: typeof holds.$inferInsert) {
const [hold] = await this.drizzle.db
.insert(holds)
.values(data)
.returning();
return hold;
}
async update(id: string, data: Partial<typeof holds.$inferInsert>) {
const [hold] = await this.drizzle.db
.update(holds)
.set(data)
.where(eq(holds.id, id))
.returning();
return hold;
}
async list(pagination?: { limit?: number; offset?: number }) {
return this.drizzle.db
.select()
.from(holds)
.limit(pagination?.limit)
.offset(pagination?.offset);
}
async getBalance(subaccountId: string, currencyCode: string): Promise<number> {
const [result] = await this.drizzle.db
.select({
balance: sql<number>`COALESCE(SUM(CASE WHEN ${postings.debitSubaccountId} = ${subaccountId} THEN -${postings.amount} ELSE ${postings.amount} END), 0)`,
})
.from(postings)
.where(eq(postings.currencyCode, currencyCode));
return result?.balance || 0;
}
}Сервисы
EventPublisher
// domains/ledger/domain/services/event-publisher.ts
import { Injectable } from '@nestjs/common';
import { ProducerService } from '@maniton/nestjs-common/kafka';
import { create, toBinary } from '@bufbuild/protobuf';
import { CloudEventSchema, EventEnvelopeSchema } from '@maniton/contracts/gen/ts/maniton/events/v1/envelope_pb';
import { anyPack } from '@maniton/protobuf/wkt';
@Injectable()
export class EventPublisher {
constructor(private readonly producer: ProducerService) {}
async publish(topic: string, eventType: string, payload: any, context: any) {
const cloudEvent = create(CloudEventSchema, {
id: crypto.randomUUID(),
source: 'ledger-service',
specVersion: '1.0',
type: eventType,
dataContentType: 'application/protobuf',
data: {
case: 'protoData',
value: anyPack(eventType, payload),
},
});
const envelope = create(EventEnvelopeSchema, {
event: cloudEvent,
context,
topic,
schemaSubject: eventType,
});
const bytes = toBinary(EventEnvelopeSchema, envelope);
await this.producer.send(topic, bytes);
}
}Мониторинг
Метрики
// infrastructure/metrics/metrics.service.ts
import { Injectable } from '@nestjs/common';
import { Counter, Histogram, Gauge } from 'prom-client';
@Injectable()
export class MetricsService {
private readonly operationsTotal = new Counter({
name: 'ledger_operations_total',
help: 'Total number of ledger operations',
labelNames: ['type', 'status'],
});
private readonly holdsTotal = new Counter({
name: 'ledger_holds_total',
help: 'Total number of holds',
labelNames: ['status'],
});
private readonly operationDuration = new Histogram({
name: 'ledger_operation_duration_seconds',
help: 'Time spent on ledger operations',
labelNames: ['operation'],
buckets: [0.1, 0.5, 1, 2, 5, 10, 30, 60],
});
incrementOperation(type: string, status: string) {
this.operationsTotal.inc({ type, status });
}
incrementHold(status: string) {
this.holdsTotal.inc({ status });
}
recordOperationDuration(operation: string, duration: number) {
this.operationDuration.observe({ operation }, duration);
}
}Логи
// infrastructure/logging/logger.service.ts
import { Injectable } from '@nestjs/common';
@Injectable()
export class LoggerService {
private readonly context = 'LedgerService';
info(message: string, meta?: Record<string, any>) {
console.log(`[${this.context}] [INFO] ${message}`, meta);
}
error(message: string, error?: Error, meta?: Record<string, any>) {
if (error) {
console.error(`[${this.context}] [ERROR] ${message}`, error, meta);
} else {
console.error(`[${this.context}] [ERROR] ${message}`, meta);
}
}
warn(message: string, meta?: Record<string, any>) {
console.warn(`[${this.context}] [WARN] ${message}`, meta);
}
debug(message: string, meta?: Record<string, any>) {
if (process.env.LOG_LEVEL === 'debug') {
console.debug(`[${this.context}] [DEBUG] ${message}`, meta);
}
}
}Тестирование
Unit Tests
// domains/ledger/application/use-cases/create-operation.use-case.spec.ts
import { Test, TestingModule } from '@nestjs/testing';
import { CreateOperationUseCase } from './create-operation.use-case';
import { OperationRepository } from '../../domain/repositories/operation.repository';
import { PostingRepository } from '../../domain/repositories/posting.repository';
describe('CreateOperationUseCase', () => {
let useCase: CreateOperationUseCase;
let operationRepository: jest.Mocked<OperationRepository>;
let postingRepository: jest.Mocked<PostingRepository>;
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [
CreateOperationUseCase,
{
provide: OperationRepository,
useValue: {
findByIdempotencyKey: jest.fn(),
create: jest.fn(),
},
},
{
provide: PostingRepository,
useValue: {
create: jest.fn(),
},
},
],
}).compile();
useCase = module.get(CreateOperationUseCase);
operationRepository = module.get(OperationRepository);
postingRepository = module.get(PostingRepository);
});
it('should create operation', async () => {
const request = {
context: { requestId: '1', correlationId: '1', idempotencyKey: '1' },
operation: {
type: 'ISSUANCE',
status: 'PROCESSING',
userId: 'user-123',
idempotencyKey: '1',
},
postings: [
{
debitSubaccountId: 'user-123:CFA-RUB',
creditSubaccountId: 'system:reserve:CFA-RUB',
amount: { amount: '1000', currencyCode: 'RUB' },
},
],
};
const result = await useCase.execute(request);
expect(operationRepository.create).toHaveBeenCalled();
expect(postingRepository.create).toHaveBeenCalled();
});
});Troubleshooting
Проблема: Баланс не обновляется
Решение:
- Проверьте, что проводки созданы
- Проверьте, что subaccount IDs верные
- Проверьте SQL запрос для расчета баланса
Проблема: Холд не создается
Решение:
- Проверьте, что баланс достаточен
- Проверьте, что subaccount существует
- Проверьте логи HoldRepository