Reference
CFA Core Service - Реализация
Детальная реализация CFA Core Service
CFA Core Service - Реализация
Обзор
CFA Core Service — микросервис на NestJS для управления цифровыми финансовыми активами (ЦФА). Отвечает за выпуск, погашение и переводы ЦФА, а также за взаимодействие с блокчейном Hyperledger Besu.
Структура проекта
apps/services/cfa-core/src/
├── app/
│ ├── app.controller.ts
│ └── app.service.ts
├── app.module.ts
├── config/
│ └── config.module.ts
├── domains/
│ └── cfa/ # Домен ЦФА
│ ├── application/
│ │ └── use-cases/
│ ├── domain/
│ │ ├── repositories/
│ │ ├── services/
│ │ └── entities/
│ └── presentation/
│ └── cfa-grpc.gateway.ts
├── infrastructure/
│ ├── database/
│ ├── kafka/
│ ├── besu/
│ └── logging/
├── main.ts
└── shared/Модули
AppModule
// app.module.ts
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { DrizzleModule } from '@maniton/nestjs-common/database';
import { KafkaModule } from '@maniton/nestjs-common/kafka';
import { LoggingModule } from '@maniton/nestjs-common/logging';
import { CfaModule } from './domains/cfa/cfa.module';
@Module({
imports: [
ConfigModule.forRoot({
isGlobal: true,
envFilePath: ['.env.local', '.env'],
}),
DrizzleModule.forRoot({
url: process.env.DATABASE_URL,
}),
KafkaModule.forRoot({
brokers: process.env.KAFKA_BROKERS?.split(',') || ['localhost:9092'],
clientId: 'cfa-core',
}),
LoggingModule.forRoot({
level: process.env.LOG_LEVEL || 'info',
}),
CfaModule,
],
controllers: [],
providers: [],
})
export class AppModule {}Use Cases
IssueCfaRubUseCase
Выпуск CFA-RUB (денежных требований).
// domains/cfa/application/use-cases/issue-cfa-rub.use-case.ts
import { Injectable } from '@nestjs/common';
import { OperationRepository } from '../../domain/repositories/operation.repository';
import { IdentityClient } from '../../domain/clients/identity.client';
import { BesuCommand } from '../../domain/clients/besu.command';
import { EventPublisher } from '../../domain/services/event-publisher';
import {
OperationStatus,
OperationType,
KycStatus,
} from '@maniton/contracts/gen/ts/maniton/common/v1/common_pb';
@Injectable()
export class IssueCfaRubUseCase {
constructor(
private readonly operationRepository: OperationRepository,
private readonly identityClient: IdentityClient,
private readonly besuCommand: BesuCommand,
private readonly eventPublisher: EventPublisher,
) {}
async execute(request: {
context: any;
userId: string;
amount: { amount: string; currencyCode: string };
paymentId: string;
}): Promise<any> {
const { context, userId, amount, paymentId } = request;
// 1. Проверка идемпотентности
const existing = await this.operationRepository.findByIdempotencyKey(context.idempotencyKey);
if (existing) {
return { operation: existing };
}
// 2. Проверка KYC
const userProfile = await this.identityClient.getUserProfile(context, userId);
if (userProfile.kycStatus < KycStatus.STANDARD) {
const operation = await this.operationRepository.create({
type: OperationType.ISSUANCE,
status: OperationStatus.REJECTED,
userId,
idempotencyKey: context.idempotencyKey,
externalRef: paymentId,
rejectionReason: 'KYC_REQUIRED',
rejectionMessage: 'KYC level too low for issuance',
});
return { operation };
}
// 3. Проверка лимитов
const limitsCheck = await this.identityClient.checkLimits(
context,
userId,
'DEPOSIT',
amount,
OperationType.ISSUANCE,
);
if (!limitsCheck.allowed) {
const operation = await this.operationRepository.create({
type: OperationType.ISSUANCE,
status: OperationStatus.REJECTED,
userId,
idempotencyKey: context.idempotencyKey,
externalRef: paymentId,
rejectionReason: 'LIMIT_EXCEEDED',
rejectionMessage: limitsCheck.reason,
});
return { operation };
}
// 4. Создание операции
const operation = await this.operationRepository.create({
type: OperationType.ISSUANCE,
status: OperationStatus.PROCESSING,
userId,
idempotencyKey: context.idempotencyKey,
externalRef: paymentId,
});
// 5. Генерация транзакции
const txPayload = this.buildMintTx({
to: '0x' + userId.replace(/-/g, '').substring(0, 40),
value: amount.amount.value,
});
const signedTx = await this.signTransaction(txPayload);
// 6. Отправка в блокчейн
const txHash = await this.besuCommand.submitTransaction({
context,
payload: signedTx,
});
// 7. Обновление статуса
await this.operationRepository.update(operation.operationId, {
status: OperationStatus.ONCHAIN_SUBMITTED,
onchainTxHash: txHash,
});
// 8. Публикация события
await this.eventPublisher.publish({
type: 'IssuanceStarted',
payload: operation,
});
return { operation };
}
private buildMintTx(params: { to: string; value: string }): string {
// Генерация raw transaction для mint
// Использует viem для создания транзакции
return this.encodeFunctionData({
abi: cfaRubAbi,
functionName: 'mint',
args: [params.to as Address, BigInt(params.value)],
});
}
private async signTransaction(payload: string): Promise<string> {
// Подпись транзакции
// Интеграция с HSM/Web3Signer для безопасной подписи
const signature = await this.web3Signer.sign({
payload,
chainId: this.configService.get('BESU_CHAIN_ID'),
});
return signature;
}
}RedeemCfaRubUseCase
Погашение CFA-RUB.
// domains/cfa/application/use-cases/redeem-cfa-rub.use-case.ts
import { Injectable } from '@nestjs/common';
import { OperationRepository } from '../../domain/repositories/operation.repository';
import { IdentityClient } from '../../domain/clients/identity.client';
import { BesuCommand } from '../../domain/clients/besu.command';
import { EventPublisher } from '../../domain/services/event-publisher';
import {
OperationStatus,
OperationType,
KycStatus,
} from '@maniton/contracts/gen/ts/maniton/common/v1/common_pb';
@Injectable()
export class RedeemCfaRubUseCase {
constructor(
private readonly operationRepository: OperationRepository,
private readonly identityClient: IdentityClient,
private readonly besuCommand: BesuCommand,
private readonly eventPublisher: EventPublisher,
) {}
async execute(request: {
context: any;
userId: string;
amount: { amount: string; currencyCode: string };
payoutId: string;
}): Promise<any> {
const { context, userId, amount, payoutId } = request;
// 1. Проверка идемпотентности
const existing = await this.operationRepository.findByIdempotencyKey(context.idempotencyKey);
if (existing) {
return { operation: existing };
}
// 2. Проверка KYC
const userProfile = await this.identityClient.getUserProfile(context, userId);
if (userProfile.kycStatus < KycStatus.STANDARD) {
const operation = await this.operationRepository.create({
type: OperationType.REDEMPTION,
status: OperationStatus.REJECTED,
userId,
idempotencyKey: context.idempotencyKey,
externalRef: payoutId,
rejectionReason: 'KYC_REQUIRED',
rejectionMessage: 'KYC level too low for redemption',
});
return { operation };
}
// 3. Проверка лимитов
const limitsCheck = await this.identityClient.checkLimits(
context,
userId,
'WITHDRAWAL',
amount,
OperationType.REDEMPTION,
);
if (!limitsCheck.allowed) {
const operation = await this.operationRepository.create({
type: OperationType.REDEMPTION,
status: OperationStatus.REJECTED,
userId,
idempotencyKey: context.idempotencyKey,
externalRef: payoutId,
rejectionReason: 'LIMIT_EXCEEDED',
rejectionMessage: limitsCheck.reason,
});
return { operation };
}
// 4. Создание операции
const operation = await this.operationRepository.create({
type: OperationType.REDEMPTION,
status: OperationStatus.PROCESSING,
userId,
idempotencyKey: context.idempotencyKey,
externalRef: payoutId,
});
// 5. Генерация транзакции
const txPayload = this.buildBurnTx({
from: '0x' + userId.replace(/-/g, '').substring(0, 40),
value: amount.amount.value,
});
const signedTx = await this.signTransaction(txPayload);
// 6. Отправка в блокчейн
const txHash = await this.besuCommand.submitTransaction({
context,
payload: signedTx,
});
// 7. Обновление статуса
await this.operationRepository.update(operation.operationId, {
status: OperationStatus.ONCHAIN_SUBMITTED,
onchainTxHash: txHash,
});
// 8. Публикация события
await this.eventPublisher.publish({
type: 'RedemptionStarted',
payload: operation,
});
return { operation };
}
private buildBurnTx(params: { from: string; value: string }): string {
// Генерация raw transaction для burn
return this.encodeFunctionData({
abi: cfaRubAbi,
functionName: 'burn',
args: [params.from as Address, BigInt(params.value)],
});
}
private async signTransaction(payload: string): Promise<string> {
// Подпись транзакции
// Интеграция с HSM/Web3Signer для безопасной подписи
const signature = await this.web3Signer.sign({
payload,
chainId: this.configService.get('BESU_CHAIN_ID'),
});
return signature;
}
}TransferCfaUseCase
Перевод ЦФА между пользователями.
// domains/cfa/application/use-cases/transfer-cfa.use-case.ts
import { Injectable } from '@nestjs/common';
import { OperationRepository } from '../../domain/repositories/operation.repository';
import { BesuCommand } from '../../domain/clients/besu.command';
import { EventPublisher } from '../../domain/services/event-publisher';
import {
OperationStatus,
OperationType,
} from '@maniton/contracts/gen/ts/maniton/common/v1/common_pb';
@Injectable()
export class TransferCfaUseCase {
constructor(
private readonly operationRepository: OperationRepository,
private readonly besuCommand: BesuCommand,
private readonly eventPublisher: EventPublisher,
) {}
async execute(request: {
context: any;
instrumentId: string;
fromUserId: string;
toUserId: string;
amount: { amount: string; currencyCode: string };
tradeId?: string;
}): Promise<any> {
const { context, instrumentId, fromUserId, toUserId, amount, tradeId } = request;
// 1. Проверка идемпотентности
const existing = await this.operationRepository.findByIdempotencyKey(context.idempotencyKey);
if (existing) {
return { operation: existing };
}
// 2. Создание операции
const operation = await this.operationRepository.create({
type: OperationType.TRADING,
status: OperationStatus.PROCESSING,
userId: fromUserId,
idempotencyKey: context.idempotencyKey,
externalRef: tradeId,
});
// 3. Генерация транзакции
const txPayload = this.buildTransferTx({
from: '0x' + fromUserId.replace(/-/g, '').substring(0, 40),
to: '0x' + toUserId.replace(/-/g, '').substring(0, 40),
value: amount.amount.value,
});
const signedTx = await this.signTransaction(txPayload);
// 4. Отправка в блокчейн
const txHash = await this.besuCommand.submitTransaction({
context,
payload: signedTx,
});
// 5. Обновление статуса
await this.operationRepository.update(operation.operationId, {
status: OperationStatus.ONCHAIN_SUBMITTED,
onchainTxHash: txHash,
});
// 6. Публикация события
await this.eventPublisher.publish({
type: 'TransferStarted',
payload: operation,
});
return { operation };
}
private buildTransferTx(params: { from: string; to: string; value: string }): string {
// Генерация raw transaction для transfer
return this.encodeFunctionData({
abi: cfaRubAbi,
functionName: 'transfer',
args: [params.to as Address, BigInt(params.value)],
});
}
private async signTransaction(payload: string): Promise<string> {
// Подпись транзакции
// Интеграция с HSM/Web3Signer для безопасной подписи
const signature = await this.web3Signer.sign({
payload,
chainId: this.configService.get('BESU_CHAIN_ID'),
});
return signature;
}
}Connect RPC Gateway
CfaGrpcGateway
// domains/cfa/presentation/cfa-grpc.gateway.ts
import { ConnectGateway, ConnectRouter } from '@maniton/nestjs-common/connect';
import { CfaRegistryService } from '@maniton/contracts/gen/ts/maniton/cfa/v1/cfa_pb';
import type { CfaUseCases } from './cfa-grpc.gateway';
export class CfaGrpcGateway extends ConnectGateway {
protected readonly options = {
requestPathPrefix: '/cfa-service',
};
protected registerRoutes(router: ConnectRouter): void {
router.service(CfaRegistryService, {
registerInstrument: (req) => this.useCases.registerInstrument.execute(req),
issueCfaRub: (req) => this.useCases.issueCfaRub.execute(req),
redeemCfaRub: (req) => this.useCases.redeemCfaRub.execute(req),
issueInvestmentCfa: (req) => this.useCases.issueInvestmentCfa.execute(req),
transferCfa: (req) => this.useCases.transferCfa.execute(req),
listHolders: (req) => this.useCases.listHolders.execute(req),
checkIntegrity: (req) => this.useCases.checkIntegrity.execute(req),
});
}
}Репозитории
OperationRepository
// domains/cfa/domain/repositories/operation.repository.ts
import { Injectable } from '@nestjs/common';
import { DrizzleService } from '@maniton/nestjs-common/database';
import { operations } from '@maniton/contracts/gen/drizzle/schema';
import { eq } from 'drizzle-orm';
@Injectable()
export class OperationRepository {
constructor(private readonly drizzle: DrizzleService) {}
async findById(id: string) {
const [operation] = await this.drizzle.db
.select()
.from(operations)
.where(eq(operations.id, id))
.limit(1);
return operation;
}
async findByIdempotencyKey(idempotencyKey: string) {
const [operation] = await this.drizzle.db
.select()
.from(operations)
.where(eq(operations.idempotencyKey, idempotencyKey))
.limit(1);
return operation;
}
async findByTxHash(txHash: string) {
const [operation] = await this.drizzle.db
.select()
.from(operations)
.where(eq(operations.onchainTxHash, txHash))
.limit(1);
return operation;
}
async create(data: typeof operations.$inferInsert) {
const [operation] = await this.drizzle.db.insert(operations).values(data).returning();
return operation;
}
async update(id: string, data: Partial<typeof operations.$inferInsert>) {
const [operation] = await this.drizzle.db
.update(operations)
.set(data)
.where(eq(operations.id, id))
.returning();
return operation;
}
async list(pagination?: { limit?: number; offset?: number }) {
return this.drizzle.db
.select()
.from(operations)
.limit(pagination?.limit)
.offset(pagination?.offset);
}
}Клиенты
IdentityClient
// domains/cfa/domain/clients/identity.client.ts
import { Injectable } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
import { ClientProxyFactory } from '@nestjs/microservices';
@Injectable()
export class IdentityClient {
private client: ClientProxy;
constructor() {
this.client = ClientProxyFactory.create({
transport: Transport.KAFKA,
options: {
client: {
kafka: {
brokers: process.env.KAFKA_BROKERS?.split(',') || ['localhost:9092'],
},
},
},
});
}
async getUserProfile(context: any, userId: string) {
return this.client.send({ cmd: 'getUserProfile' }, { context, userId }).toPromise();
}
async checkLimits(
context: any,
userId: string,
limitType: string,
amount: any,
operationType: string,
) {
return this.client
.send({ cmd: 'checkLimits' }, { context, userId, limitType, amount, operationType })
.toPromise();
}
}BesuCommand
// domains/cfa/domain/clients/besu.command.ts
import { Injectable } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
import { ClientProxyFactory } from '@nestjs/microservices';
@Injectable()
export class BesuCommand {
private client: ClientProxy;
constructor() {
this.client = ClientProxyFactory.create({
transport: Transport.KAFKA,
options: {
client: {
kafka: {
brokers: process.env.KAFKA_BROKERS?.split(',') || ['localhost:9092'],
},
},
},
});
}
async submitTransaction(payload: { context: any; tx: string }) {
return this.client.send({ cmd: 'submitTransaction' }, payload).toPromise();
}
async getTransactionReceipt(txHash: string) {
return this.client.send({ cmd: 'getTransactionReceipt' }, { txHash }).toPromise();
}
async getBlock(number: number) {
return this.client.send({ cmd: 'getBlock' }, { number }).toPromise();
}
}Сервисы
EventPublisher
// domains/cfa/domain/services/event-publisher.ts
import { Injectable } from '@nestjs/common';
import { ProducerService } from '@maniton/nestjs-common/kafka';
import { create, toBinary } from '@bufbuild/protobuf';
import {
CloudEventSchema,
EventEnvelopeSchema,
} from '@maniton/contracts/gen/ts/maniton/events/v1/envelope_pb';
import { anyPack } from '@maniton/protobuf/wkt';
@Injectable()
export class EventPublisher {
constructor(private readonly producer: ProducerService) {}
async publish(topic: string, eventType: string, payload: any, context: any) {
const cloudEvent = create(CloudEventSchema, {
id: crypto.randomUUID(),
source: 'cfa-core',
specVersion: '1.0',
type: eventType,
dataContentType: 'application/protobuf',
data: {
case: 'protoData',
value: anyPack(eventType, payload),
},
});
const envelope = create(EventEnvelopeSchema, {
event: cloudEvent,
context,
topic,
schemaSubject: eventType,
});
const bytes = toBinary(EventEnvelopeSchema, envelope);
await this.producer.send(topic, bytes);
}
}Мониторинг
Метрики
// infrastructure/metrics/metrics.service.ts
import { Injectable } from '@nestjs/common';
import { Counter, Histogram, Gauge } from 'prom-client';
@Injectable()
export class MetricsService {
private readonly issuancesTotal = new Counter({
name: 'cfa_issuances_total',
help: 'Total number of CFA issuances',
labelNames: ['type', 'status'],
});
private readonly redemptionsTotal = new Counter({
name: 'cfa_redemptions_total',
help: 'Total number of CFA redemptions',
labelNames: ['type', 'status'],
});
private readonly transfersTotal = new Counter({
name: 'cfa_transfers_total',
help: 'Total number of CFA transfers',
labelNames: ['instrument_id'],
});
private readonly operationDuration = new Histogram({
name: 'cfa_operation_duration_seconds',
help: 'Time spent on CFA operations',
labelNames: ['operation'],
buckets: [1, 5, 10, 30, 60, 120, 300, 600],
});
incrementIssuance(type: string, status: string) {
this.issuancesTotal.inc({ type, status });
}
incrementRedemption(type: string, status: string) {
this.redemptionsTotal.inc({ type, status });
}
incrementTransfer(instrumentId: string) {
this.transfersTotal.inc({ instrument_id: instrumentId });
}
recordOperationDuration(operation: string, duration: number) {
this.operationDuration.observe({ operation }, duration);
}
}Troubleshooting
Проблема: Транзакция не проходит в блокчейне
Решение:
- Проверьте, что газ достаточен
- Проверьте, что nonce верный
- Проверьте, что контракт существует
- Проверьте логи BesuCommand
Проблема: Операция не финализируется
Решение:
- Проверьте, что транзакция прошла в блокчейне
- Проверьте, что receipt получен
- Проверьте логи EventPublisher