Каталог Kafka топиков
Полный список и описание топиков в системе
Каталог Kafka топиков
Все взаимодействие между сервисами в платформе Манитон происходит через единую шину событий Kafka. Топики разделены по доменным областям.
Правила именования
Формат: maniton.<domain>.events.v1
maniton— префикс организации.<domain>— доменная область (identity, ledger, besu и т.д.).events— тип данных.v1— версия схемы.
Список топиков
| Топик | Описание | Основные типы событий |
|---|---|---|
maniton.identity.events.v1 | События KYC и пользователей | UserCreated, KycVerified, LimitChanged |
maniton.ledger.events.v1 | Операционный учет и балансы | BalanceUpdated, OperationCreated, HoldCreated |
maniton.cfa.events.v1 | Жизненный цикл ЦФА | CfaIssued, CfaTransferred, CfaRedeemed |
maniton.besu.events.v1 | Команды и события DLT | SubmitTx, BlockRead, TxReceiptReady |
maniton.payments.events.v1 | Платежи и СБП | FiatDeposited, WithdrawalRequested |
maniton.market.events.v1 | Торговля и ордера | OrderPlaced, TradeExecuted, OrderCancelled |
maniton.audit.logs.v1 | Журнал аудита | SecurityEvent, AdminAction |
Формат сообщения (EventEnvelope)
Каждое сообщение в Kafka оборачивается в структуру EventEnvelope, которая соответствует спецификации CloudEvents и обогащена контекстом запроса.
message EventEnvelope {
// Уникальный ID события (UUID)
string id = 1;
// Источник (например, "maniton.service.ledger")
string source = 2;
// Тип события (например, "maniton.ledger.v1.BalanceUpdated")
string type = 3;
// Время возникновения
google.protobuf.Timestamp time = 4;
// Сами данные (сериализованный Protobuf)
bytes data = 5;
// Контекст запроса для трассировки и идемпотентности
RequestContext context = 6;
}RequestContext
Используется для сквозной трассировки (distributed tracing) и обеспечения идемпотентности во всей системе.
message RequestContext {
string request_id = 1; // UUID запроса
string correlation_id = 2; // UUID для связи запрос-ответ
string idempotency_key = 3; // Ключ идемпотентности
string trace_id = 5; // OpenTelemetry trace_id
string user_id = 6; // Инициатор (если есть)
string client_ip = 8;
Contour contour = 9; // RF (РФ) или EXTERNAL (Внешний)
}Гарантии доставки
Платформа использует режим At-Least-Once доставки сообщений. Это означает:
- Обработчики (Consumers) обязаны быть идемпотентными.
- Использование
idempotency_keyиз контекста обязательно для всех записей в БД. - Повторная обработка события с тем же ID не должна приводить к повторным начислениям или списаниям.
Детальное описание топиков
maniton.identity.events.v1
Назначение: События жизненного цикла пользователей и KYC
Основные события:
| Событие | Описание | Поля |
|---|---|---|
UserCreated | Создание нового пользователя | user_id, email, phone |
UserUpdated | Обновление профиля пользователя | user_id, changes |
KycSubmitted | Подача KYC заявки | case_id, user_id, payload |
KycStatusChanged | Изменение статуса KYC | user_id, old_status, new_status |
UserBlocked | Блокировка пользователя | user_id, reason |
UserUnblocked | Разблокировка пользователя | user_id, reason |
LimitChanged | Изменение лимитов пользователя | user_id, limit_type, new_limit |
SanctionsChecked | Результат санкционной проверки | user_id, status, matched_lists |
Пример события:
message KycStatusChangedEvent {
RequestContext context = 1;
string user_id = 2;
KycStatus old_status = 3;
KycStatus new_status = 4;
google.protobuf.Timestamp changed_at = 5;
}maniton.ledger.events.v1
Назначение: События операционного учета и балансов
Основные события:
| Событие | Описание | Поля |
|---|---|---|
AccountCreated | Создание счета пользователя | account_id, user_id, type |
SubAccountCreated | Создание субсчета | subaccount_id, account_id, instrument_id |
OperationCreated | Создание операции | operation_id, type, user_id, amount |
OperationStatusChanged | Изменение статуса операции | operation_id, old_status, new_status |
BalanceUpdated | Обновление баланса | subaccount_id, old_balance, new_balance |
PostingCreated | Создание проводки | posting_id, operation_id, debit, credit, amount |
HoldCreated | Создание блокировки средств | hold_id, subaccount_id, amount, reason |
HoldReleased | Освобождение блокировки | hold_id, reason |
HoldConsumed | Списание блокированных средств | hold_id |
Пример события:
message BalanceUpdatedEvent {
RequestContext context = 1;
string subaccount_id = 2;
Decimal old_balance = 3;
Decimal new_balance = 4;
string operation_id = 5;
google.protobuf.Timestamp updated_at = 6;
}maniton.cfa.events.v1
Назначение: События жизненного цикла ЦФА
Основные события:
| Событие | Описание | Поля |
|---|---|---|
InstrumentRegistered | Регистрация инструмента | instrument_id, symbol, name, type |
CfaIssued | Выпуск ЦФА | operation_id, instrument_id, holder_id, amount |
CfaRedeemed | Погашение ЦФА | operation_id, instrument_id, holder_id, amount |
CfaTransferred | Перевод ЦФА | operation_id, instrument_id, from_user, to_user, amount |
IssuanceStarted | Начало процесса выпуска | operation_id, user_id, amount |
IssuanceFinalized | Финализация выпуска | operation_id, tx_hash |
RedemptionStarted | Начало процесса погашения | operation_id, user_id, amount |
RedemptionFinalized | Финализация погашения | operation_id, tx_hash |
IntegrityChecked | Проверка целостности | block_number, block_hash, valid |
Пример события:
message CfaIssuedEvent {
RequestContext context = 1;
string operation_id = 2;
string instrument_id = 3;
string holder_id = 4;
MonetaryAmount amount = 5;
string tx_hash = 6;
google.protobuf.Timestamp issued_at = 7;
}maniton.besu.events.v1
Назначение: События DLT-слоя и блокчейна
Основные события:
| Событие | Описание | Поля |
|---|---|---|
TransactionSubmitted | Транзакция отправлена в сеть | tx_hash, status |
TransactionConfirmed | Транзакция подтверждена | tx_hash, block_number |
TransactionFailed | Транзакция не прошла | tx_hash, reason |
TransactionReceiptReady | Квитанция транзакции готова | tx_hash, receipt |
BlockRead | Блок прочитан | block_number, block_hash, transactions |
BlockHashVerified | Хэш блока проверен | block_number, block_hash, valid, computed_hash |
Пример события:
message TransactionReceiptReadyEvent {
RequestContext context = 1;
TransactionReceipt receipt = 2;
}
message TransactionReceipt {
string tx_hash = 1;
BesuTxStatus status = 2;
uint64 block_number = 3;
string block_hash = 4;
uint64 gas_used = 5;
google.protobuf.Timestamp mined_at = 6;
}maniton.payments.events.v1
Назначение: События платежей и интеграций с банками
Основные события:
| Событие | Описание | Поля |
|---|---|---|
FiatDeposited | Фиатные средства поступили | user_id, amount, payment_id |
WithdrawalRequested | Запрос на вывод средств | user_id, amount, payout_id |
WithdrawalCompleted | Вывод средств завершен | user_id, amount, payout_id |
PaymentIntentCreated | Создание намерения платежа | payment_id, user_id, amount, method |
PaymentStatusChanged | Изменение статуса платежа | payment_id, old_status, new_status |
ReconciliationCompleted | Сверка завершена | report_id, balanced, notes |
Пример события:
message FiatDepositedEvent {
RequestContext context = 1;
string user_id = 2;
Money amount = 3;
string payment_id = 4;
google.protobuf.Timestamp deposited_at = 5;
}maniton.market.events.v1
Назначение: События торгов и ордеров
Основные события:
| Событие | Описание | Поля |
|---|---|---|
OrderPlaced | Ордер размещен | order_id, user_id, instrument_id, side, type, price, quantity |
OrderCancelled | Ордер отменен | order_id, reason |
OrderFilled | Ордер исполнен | order_id, filled_quantity, remaining_quantity |
TradeExecuted | Сделка исполнена | trade_id, buy_order_id, sell_order_id, instrument_id, price, quantity, fee |
OrderBookUpdated | Стакан обновлен | instrument_id, bids, asks |
Пример события:
message TradeExecutedEvent {
RequestContext context = 1;
string trade_id = 2;
string buyer_id = 3;
string seller_id = 4;
string instrument_id = 5;
Decimal quantity = 6;
Decimal price = 7;
Fee fee = 8;
google.protobuf.Timestamp executed_at = 9;
}maniton.audit.logs.v1
Назначение: Аудит логи и события безопасности
Основные события:
| Событие | Описание | Поля |
|---|---|---|
SecurityEvent | Событие безопасности | event_type, user_id, ip, details |
AdminAction | Действие администратора | admin_id, action, target_user_id, details |
ComplianceViolation | Нарушение комплаенс | violation_type, user_id, details |
SystemEvent | Системное событие | event_type, service, details |
Пример события:
message SecurityEvent {
RequestContext context = 1;
string event_type = 2;
string user_id = 3;
string ip = 4;
map<string, string> details = 5;
google.protobuf.Timestamp occurred_at = 6;
}Конфигурация топиков
Ретеншн и компакция
| Топик | Ретеншн | Компакция | Причина |
|---|---|---|---|
maniton.identity.events.v1 | 7 дней | Нет | KYC данные не критичны для долгого хранения |
maniton.ledger.events.v1 | 30 дней | Да | Операции важны для аудита |
maniton.cfa.events.v1 | 30 дней | Да | Жизненный цикл ЦФА важен для аудита |
maniton.besu.events.v1 | 7 дней | Нет | Блокчейн данные доступны в узлах |
maniton.payments.events.v1 | 30 дней | Да | Платежи требуют долгого хранения |
maniton.market.events.v1 | 7 дней | Нет | Торговые данные быстро устаревают |
maniton.audit.logs.v1 | 365 дней | Да | Аудит требует долгого хранения |
Партиционирование
# Создание топика с партиционированием
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic maniton.ledger.events.v1 \
--partitions 10 \
--replication-factor 3 \
--config retention.ms=2592000000 \
--config cleanup.policy=compact \
--config segment.ms=86400000Конфигурация продюсеров
import { Producer } from 'kafkajs';
const producer = kafka.producer({
maxInFlightRequests: 1,
idempotent: true,
transactionTimeout: 30000,
retries: {
maxAttempts: 5,
initialRetryTime: 100,
maxRetryTime: 1000,
},
});
await producer.connect();
// Отправка события
await producer.send({
topic: 'maniton.ledger.events.v1',
messages: [
{
key: operationId,
value: Buffer.from(eventEnvelope.toBinary()),
headers: {
'event-type': 'OperationCreated',
'content-type': 'application/protobuf',
},
},
],
});Конфигурация консьюмеров
import { Consumer } from 'kafkajs';
const consumer = kafka.consumer({
groupId: 'ledger-service',
maxWaitTimeInMs: 1000,
maxBytesPerPartition: 1048576, // 1MB
sessionTimeout: 30000,
heartbeatInterval: 3000,
});
await consumer.subscribe({
topic: 'maniton.cfa.events.v1',
fromBeginning: false,
});
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const eventEnvelope = EventEnvelope.fromBinary(message.value);
// Идемпотентность
const existing = await this.operationRepository.findByIdempotencyKey(
eventEnvelope.context.idempotencyKey
);
if (existing) {
return; // Уже обработано
}
// Обработка события
await this.handleEvent(eventEnvelope);
},
},
});Best Practices
1. Идемпотентность
async handleEvent(event: EventEnvelope) {
// Проверка идемпотентности
const processed = await this.processedEventsRepository.findById(
event.id
);
if (processed) {
this.logger.warn('Event already processed', { eventId: event.id });
return;
}
// Обработка события
await this.processEvent(event);
// Сохранение метки обработки
await this.processedEventsRepository.save({
eventId: event.id,
processedAt: new Date(),
});
}2. Обработка ошибок
async processEvent(event: EventEnvelope) {
try {
await this.handleEvent(event);
} catch (error) {
// Логирование ошибки
this.logger.error('Event processing failed', {
eventId: event.id,
error: error.message,
stack: error.stack,
});
// Отправка в Dead Letter Queue
await this.dlqProducer.send({
topic: 'maniton.dlq.v1',
messages: [
{
key: event.id,
value: Buffer.from(event.toBinary()),
headers: {
'original-topic': event.source,
'error': error.message,
},
},
],
});
// Повторная попытка или ручной разбор
throw error;
}
}3. Валидация событий
function validateEventEnvelope(envelope: EventEnvelope): void {
if (!envelope.id) {
throw new Error('Event ID is required');
}
if (!envelope.type) {
throw new Error('Event type is required');
}
if (!envelope.context) {
throw new Error('Request context is required');
}
if (!envelope.context.requestId) {
throw new Error('Request ID is required');
}
if (!envelope.context.idempotencyKey) {
throw new Error('Idempotency key is required');
}
}4. Логирование событий
this.logger.log('Event received', {
eventId: event.id,
type: event.type,
source: event.source,
userId: event.context?.userId,
traceId: event.context?.traceId,
});
this.logger.log('Event processed', {
eventId: event.id,
type: event.type,
duration: `${Date.now() - startTime}ms`,
});Мониторинг
Метрики Kafka
import { Counter, Histogram, Gauge } from 'prom-client';
export const kafkaMessagesProducedTotal = new Counter({
name: 'kafka_messages_produced_total',
help: 'Total number of messages produced',
labelNames: ['topic'],
});
export const kafkaMessagesConsumedTotal = new Counter({
name: 'kafka_messages_consumed_total',
help: 'Total number of messages consumed',
labelNames: ['topic', 'consumer_group'],
});
export const kafkaConsumerLag = new Gauge({
name: 'kafka_consumer_lag',
help: 'Consumer lag by topic and partition',
labelNames: ['topic', 'partition', 'consumer_group'],
});
export const kafkaMessageProcessingDuration = new Histogram({
name: 'kafka_message_processing_duration_seconds',
help: 'Time spent processing Kafka messages',
labelNames: ['topic'],
buckets: [0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1],
});Алерты
groups:
- name: kafka
rules:
- alert: KafkaConsumerLagHigh
expr: kafka_consumer_lag > 1000
for: 5m
labels:
severity: warning
annotations:
summary: 'Kafka consumer lag is high'
description: 'Consumer lag is {{ $value }} for {{ $labels.topic }}'
- alert: KafkaMessageProcessingSlow
expr: histogram_quantile(0.95, kafka_message_processing_duration_seconds) > 1
for: 5m
labels:
severity: warning
annotations:
summary: 'Kafka message processing is slow'
description: 'P95 processing time is {{ $value }}s'
- alert: KafkaBrokerDown
expr: up{job="kafka"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: 'Kafka broker is down'Troubleshooting
Проблема: Consumer lag
Диагностика:
# Проверка consumer lag
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group ledger-service --describe
# Проверка позиций консьюмеров
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group ledger-service --membersРешение:
- Увеличьте количество консьюмеров
- Оптимизируйте обработку событий
- Увеличьте количество партиций
Проблема: Дубликаты сообщений
Причина:
- Producer retry
- Consumer restart
Решение:
- Используйте идемпотентность
- Проверяйте
idempotency_key - Храните обработанные события
Проблема: Сообщения не доставляются
Диагностика:
# Проверка топиков
kafka-topics.sh --list --bootstrap-server localhost:9092
# Проверка сообщений
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic maniton.ledger.events.v1 --from-beginning
# Проверка консьюмеров
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--listРешение:
- Проверьте конфигурацию producer
- Проверьте сетевое подключение
- Проверьте логи брокера