Ledger-DB Service
Система операционного учета и двойной записи
Ledger-DB Service
Статус: Реализован. Система операционного учета с двойной записью и полной интеграцией с DLT.
Ledger-DB — это критический компонент платформы, отвечающий за операционный учет денежных средств и ЦФА. Он реализует модель двойной записи (Double-Entry Bookkeeping).
Архитектура учета
Система учета построена на следующих сущностях:
- Account: Глобальный счет пользователя или системы.
- SubAccount: Субсчет для конкретного актива (RUB, CFA-EQ, и т.д.).
- Operation: Группировка проводок, представляющая собой одну бизнес-операцию (например, "Торговля").
- Posting: Атомарная запись о движении средств между двумя субсчетами.
- Hold: Временная блокировка средств (например, под открытый ордер).
Технологический стек
- Фреймворк: NestJS
- База данных: PostgreSQL
- ORM: Drizzle ORM
- Связь: gRPC (для синхронных проверок) + Kafka (для асинхронного обновления).
Принципы работы
- Атомарность: Все проводки внутри одной
Operationвыполняются в одной транзакции БД. - Неизменяемость: Существующие проводки нельзя удалять или изменять. Для исправления ошибок используются сторнирующие (reversing) записи.
- Double Contour: Ledger-DB является проекцией данных из DLT (Besu), но также хранит промежуточные состояния (Hold), которых нет в блокчейне.
Модель данных (Entity-Relationship)
Модель данных (Protobuf)
message Account {
string account_id = 1;
string user_id = 2;
AccountType type = 3; // MAIN / INVEST / BLOCKED
}
message SubAccount {
string subaccount_id = 1;
string account_id = 2;
string instrument_id = 3; // CFA-RUB / CFA-EQ-001 / ...
Decimal balance = 4;
SubAccountStatus status = 5;
}
message Operation {
string operation_id = 1;
OperationType type = 2;
OperationStatus status = 3;
string user_id = 4;
string idempotency_key = 5;
string external_ref = 6;
string onchain_tx_hash = 7;
}
message Posting {
string posting_id = 1;
string operation_id = 2;
string debit_subaccount_id = 3;
string credit_subaccount_id = 4;
MonetaryAmount amount = 5;
}
message Hold {
string hold_id = 1;
string subaccount_id = 2;
MonetaryAmount amount = 3;
HoldStatus status = 4; // ACTIVE / RELEASED / CONSUMED / EXPIRED
string reason = 5;
}Пример проводок
Пополнение (Deposit):
Debit: System.Reserve.CFA-RUB +1000
Credit: User123.Main.CFA-RUB +1000Покупка инвест-ЦФА:
Debit: User123.Main.CFA-RUB -5000
Credit: System.Reserve.CFA-RUB +5000
Debit: System.Reserve.CFA-EQ-001 -10 shares
Credit: User123.Invest.CFA-EQ-001 +10 sharesKafka RPC API
Ledger использует Kafka RPC с reply-топиками *.reply.
| Topic | Proto | Описание |
|---|---|---|
ledger.getAccount | GetAccountRequest/Response | Получить счет |
ledger.listAccounts | ListAccountsRequest/Response | Список счетов пользователя |
ledger.getSubAccount | GetSubAccountRequest/Response | Получить субсчет |
ledger.createOperation | CreateOperationRequest/Response | Создать операцию с проводками |
ledger.getOperation | GetOperationRequest/Response | Получить операцию |
ledger.listOperations | ListOperationsRequest/Response | Список операций |
ledger.createHold | CreateHoldRequest/Response | Создать блокировку |
ledger.releaseHold | ReleaseHoldRequest/Response | Освободить блокировку |
ledger.consumeHold | ConsumeHoldRequest/Response | Списать блокировку |
События
Исходящие (maniton.ledger.events.v1)
OperationCreatedOperationStatusChangedBalanceUpdatedHoldCreatedHoldReleasedHoldConsumedUserOnchainRegistrationRequestedUserOnchainRegistrationConfirmed
Входящие
maniton.identity.events.v1:UserCreated,UserBlocked,UserUnblockedmaniton.cfa.events.v1:CfaIssued,CfaTransferred,CfaRedeemedmaniton.besu.events.v1:TransactionReceiptReadyEvent,BlockReadEvent
Сверка с DLT (Reconciliation)
Ledger использует события Besu для подтверждения финальности операций:
- При получении
TransactionReceiptReadyEventоперация переводится вFINALIZED. - На
BlockReadEventможет запускаться reconciliation-процесс для восстановления пропущенных операций.
API (gRPC Connect)
LedgerService
GetAccount
Получение счета пользователя.
message GetAccountRequest {
RequestContext context = 1;
string account_id = 2;
}
message GetAccountResponse {
Account account = 1;
}ListAccounts
Получение списка счетов пользователя.
message ListAccountsRequest {
RequestContext context = 1;
string user_id = 2;
PaginationRequest pagination = 3;
}
message ListAccountsResponse {
repeated Account accounts = 1;
PaginationResponse pagination = 2;
}GetSubAccount
Получение субсчета.
message GetSubAccountRequest {
RequestContext context = 1;
string subaccount_id = 2;
}
message GetSubAccountResponse {
SubAccount subaccount = 1;
}CreateOperation
Создание операции с проводками.
message CreateOperationRequest {
RequestContext context = 1;
Operation operation = 2;
repeated Posting postings = 3;
}
message CreateOperationResponse {
Operation operation = 1;
repeated Posting postings = 2;
}GetOperation
Получение операции.
message GetOperationRequest {
RequestContext context = 1;
string operation_id = 2;
}
message GetOperationResponse {
Operation operation = 1;
repeated Posting postings = 2;
}ListOperations
Получение списка операций пользователя.
message ListOperationsRequest {
RequestContext context = 1;
string user_id = 2;
OperationType type = 3;
OperationStatus status = 4;
PaginationRequest pagination = 5;
}
message ListOperationsResponse {
repeated Operation operations = 1;
PaginationResponse pagination = 2;
}CreateHold
Создание блокировки средств.
message CreateHoldRequest {
RequestContext context = 1;
Hold hold = 2;
}
message CreateHoldResponse {
Hold hold = 1;
}ReleaseHold
Освобождение блокировки.
message ReleaseHoldRequest {
RequestContext context = 1;
string hold_id = 2;
string reason = 3;
}
message ReleaseHoldResponse {
Hold hold = 1;
}ConsumeHold
Потребление блокированных средств.
message ConsumeHoldRequest {
RequestContext context = 1;
string hold_id = 2;
}
message ConsumeHoldResponse {
Hold hold = 1;
}События Kafka
Исходящие (maniton.ledger.events.v1)
OperationCreatedEvent
message OperationCreatedEvent {
RequestContext context = 1;
string operation_id = 2;
OperationType type = 3;
string user_id = 4;
MonetaryAmount amount = 5;
google.protobuf.Timestamp created_at = 6;
}BalanceUpdatedEvent
message BalanceUpdatedEvent {
RequestContext context = 1;
string subaccount_id = 2;
Decimal old_balance = 3;
Decimal new_balance = 4;
string operation_id = 5;
google.protobuf.Timestamp updated_at = 6;
}HoldCreatedEvent
message HoldCreatedEvent {
RequestContext context = 1;
string hold_id = 2;
string subaccount_id = 3;
MonetaryAmount amount = 4;
string reason = 5;
google.protobuf.Timestamp created_at = 6;
}HoldReleasedEvent
message HoldReleasedEvent {
RequestContext context = 1;
string hold_id = 2;
string reason = 3;
google.protobuf.Timestamp released_at = 4;
}HoldConsumedEvent
message HoldConsumedEvent {
RequestContext context = 1;
string hold_id = 2;
google.protobuf.Timestamp consumed_at = 3;
}Входящие
maniton.identity.events.v1:UserCreated,UserBlocked,UserUnblockedmaniton.cfa.events.v1:CfaIssued,CfaTransferred,CfaRedeemedmaniton.besu.events.v1:TransactionReceiptReadyEvent,BlockReadEvent
Use Cases
CreateOperationUseCase
Создание операции с проводками в рамках транзакции БД.
async execute(request: CreateOperationRequest): Promise<CreateOperationResponse> {
const { context, operation, postings } = request;
// 1. Валидация проводок
this.validatePostings(postings);
// 2. Создание операции и проводок
return await this.unitOfWork.executeWithEvents(async (tx: DbTransaction) => {
// Создание операции
const created = await this.operationRepository.save(operation, tx);
// Создание проводок
const createdPostings = await Promise.all(
postings.map(p => this.postingRepository.save(p, tx))
);
// Публикация событий
await this.eventPublisher.publish({
type: 'OperationCreated',
payload: created,
});
for (const posting of createdPostings) {
await this.eventPublisher.publish({
type: 'PostingCreated',
payload: posting,
});
}
return {
operation: created,
postings: createdPostings,
};
});
}CreateHoldUseCase
Создание блокировки средств.
async execute(request: CreateHoldRequest): Promise<CreateHoldResponse> {
const { context, hold } = request;
// 1. Проверка баланса
const balance = await this.subAccountRepository.getBalance(hold.subaccountId);
if (balance < hold.amount.amount.value) {
throw new Error('Insufficient balance');
}
// 2. Создание холда
const created = await this.holdRepository.save(hold);
// 3. Публикация события
await this.eventPublisher.publish({
type: 'HoldCreated',
payload: created,
});
return { hold: created };
}ConsumeHoldUseCase
Потребление блокированных средств.
async execute(request: ConsumeHoldRequest): Promise<ConsumeHoldResponse> {
const { context, holdId } = request;
// 1. Получение холда
const hold = await this.holdRepository.findById(holdId);
if (!hold) {
throw new Error('Hold not found');
}
if (hold.status !== HoldStatus.ACTIVE) {
throw new Error('Hold is not active');
}
// 2. Проверка срока действия
if (hold.expiresAt && new Date() > hold.expiresAt) {
hold.status = HoldStatus.EXPIRED;
await this.holdRepository.save(hold);
await this.eventPublisher.publish({
type: 'HoldReleased',
payload: {
holdId: hold.holdId,
reason: 'Expired',
},
});
throw new Error('Hold has expired');
}
// 3. Потребление холда
hold.status = HoldStatus.CONSUMED;
await this.holdRepository.save(hold);
// 4. Публикация события
await this.eventPublisher.publish({
type: 'HoldConsumed',
payload: hold,
});
return { hold };
}Модель данных
Account
Счет пользователя для хранения активов.
class Account {
accountId: string;
userId: string;
type: AccountType; // MAIN, INVEST, BLOCKED
createdAt: Date;
updatedAt: Date;
}
enum AccountType {
MAIN = 'MAIN',
INVEST = 'INVEST',
BLOCKED = 'BLOCKED',
}SubAccount
Субсчет для конкретного актива.
class SubAccount {
subaccountId: string;
accountId: string;
instrumentId: string;
balance: string; // Decimal
status: SubAccountStatus;
createdAt: Date;
updatedAt: Date;
}
enum SubAccountStatus {
ACTIVE = 'ACTIVE',
FROZEN = 'FROZEN',
CLOSED = 'CLOSED',
}Operation
Операция над активами.
class Operation {
operationId: string;
type: OperationType;
status: OperationStatus;
userId: string;
idempotencyKey: string;
externalRef?: string;
onchainTxHash?: string;
rejectionReason?: RejectionReasonCode;
rejectionMessage?: string;
createdAt: Date;
finalizedAt?: Date;
}
enum OperationType {
DEPOSIT = 'DEPOSIT',
WITHDRAWAL = 'WITHDRAWAL',
P2P_TRANSFER = 'P2P_TRANSFER',
PAYMENT = 'PAYMENT',
ISSUANCE = 'ISSUANCE',
REDEMPTION = 'REDEMPTION',
TRADE_BUY = 'TRADE_BUY',
TRADE_SELL = 'TRADE_SELL',
DIVIDEND = 'DIVIDEND',
COUPON = 'COUPON',
EXTERNAL_WITHDRAWAL = 'EXTERNAL_WITHDRAWAL',
EXTERNAL_TRANSFER = 'EXTERNAL_TRANSFER',
TRADING = 'TRADING',
FEE = 'FEE',
}
enum OperationStatus {
CREATED = 'CREATED',
KYC_REQUIRED = 'KYC_REQUIRED',
COMPLIANCE_CHECK = 'COMPLIANCE_CHECK',
AWAITING_PAYMENT = 'AWAITING_PAYMENT',
PROCESSING = 'PROCESSING',
ONCHAIN_SUBMITTED = 'ONCHAIN_SUBMITTED',
FINALIZED = 'FINALIZED',
SETTLED = 'SETTLED',
REJECTED = 'REJECTED',
BLOCKED = 'BLOCKED',
FAILED = 'FAILED',
REFUNDED = 'REFUNDED',
}Posting
Бухгалтерская проводка.
class Posting {
postingId: string;
operationId: string;
debitSubaccountId: string;
creditSubaccountId: string;
amount: MonetaryAmount;
createdAt: Date;
}Hold
Блокировка средств.
class Hold {
holdId: string;
subaccountId: string;
amount: MonetaryAmount;
status: HoldStatus;
reason: string;
createdAt: Date;
expiresAt?: Date;
}
enum HoldStatus {
ACTIVE = 'ACTIVE',
RELEASED = 'RELEASED',
CONSUMED = 'CONSUMED',
EXPIRED = 'EXPIRED',
}Мониторинг
Метрики
import { Counter, Histogram, Gauge } from 'prom-client';
export const operationsTotal = new Counter({
name: 'ledger_operations_total',
help: 'Total number of ledger operations',
labelNames: ['type', 'status'],
});
export const postingsTotal = new Counter({
name: 'ledger_postings_total',
help: 'Total number of postings',
});
export const holdsTotal = new Gauge({
name: 'ledger_holds_total',
help: 'Total number of active holds',
});
export const operationDuration = new Histogram({
name: 'ledger_operation_duration_seconds',
help: 'Time spent on ledger operations',
buckets: [0.1, 0.5, 1, 2, 5, 10, 30, 60],
});Логи
this.logger.log('Operation created', {
operationId: operation.operationId,
type: operation.type,
userId: operation.userId,
amount: operation.amount,
postingsCount: postings.length,
});
this.logger.log('Hold created', {
holdId: hold.holdId,
subaccountId: hold.subaccountId,
amount: hold.amount,
reason: hold.reason,
});
this.logger.log('Hold consumed', {
holdId: hold.holdId,
subaccountId: hold.subaccountId,
amount: hold.amount,
});Troubleshooting
Проблема: Балансы не совпадают
Диагностика:
# Проверка баланса пользователя
curl http://ledger-service:3003/accounts/$ACCOUNT_ID
# Проверка операций
curl http://ledger-service:3003/operations?user_id=$USER_ID
# Проверка проводок
curl http://ledger-service:3003/postings?operation_id=$OPERATION_IDРешение:
- Проверьте, что все проводки созданы
- Проверьте, что операции финализированы
- Запустите reconciliation процесс
Проблема: Холды не освобождаются
Диагностика:
# Проверка статуса холда
curl http://ledger-service:3003/holds/$HOLD_ID
# Проверка срока действия
curl http://ledger-service:3003/holds?status=EXPIREDРешение:
- Проверьте, что срок действия не истек
- Проверьте, что холд не был уже потреблен
- Ручное освобождение если нужно