Reference
Besu Connector - Реализация
Детальная реализация Besu Connector на Go
Besu Connector - Реализация
Обзор
Besu Connector — Go микросервис для работы с Hyperledger Besu через JSON-RPC. Принимает команды через Kafka, исполняет их в Besu, публикует события обратно в Kafka.
Структура проекта
apps/services/besu-connector/
├── cmd/
│ └── besu-connector/
│ └── main.go
├── internal/
│ ├── besu/ # Besu RPC клиент
│ │ ├── client.go
│ │ └── types.go
│ ├── config/ # Конфигурация
│ │ └── config.go
│ ├── encoding/ # Кодирование/декодирование
│ │ ├── envelope.go
│ │ └── protobuf.go
│ ├── handlers/ # Обработчики команд
│ │ ├── submit_transaction.go
│ │ ├── get_transaction_receipt.go
│ │ ├── get_block.go
│ │ └── verify_block_hash.go
│ ├── kafka/ # Kafka клиент
│ │ ├── consumer.go
│ │ └── producer.go
│ └── logging/ # Логирование
├── go.mod
├── go.sum
└── README.mdТочка входа
Main
// cmd/besu-connector/main.go
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"github.com/maniton/besu-connector/internal/config"
"github.com/maniton/besu-connector/internal/kafka"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Загрузка конфигурации
cfg, err := config.Load()
if err != nil {
log.Fatalf("Failed to load config: %v", err)
}
// Создание Kafka consumer и producer
consumer, err := kafka.NewConsumer(cfg)
if err != nil {
log.Fatalf("Failed to create consumer: %v", err)
}
producer, err := kafka.NewProducer(cfg)
if err != nil {
log.Fatalf("Failed to create producer: %v", err)
}
// Запуск обработки сообщений
go func() {
if err := consumer.Start(ctx); err != nil {
log.Printf("Consumer error: %v", err)
}
}()
// Graceful shutdown
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
log.Println("Shutting down...")
cancel()
if err := consumer.Close(); err != nil {
log.Printf("Failed to close consumer: %v", err)
}
if err := producer.Close(); err != nil {
log.Printf("Failed to close producer: %v", err)
}
log.Println("Shutdown complete")
}Конфигурация
Config
// internal/config/config.go
package config
import (
"os"
"time"
)
type Config struct {
Kafka struct {
Brokers []string
GroupID string
CommandTopics []string
EventTopic string
EventTopics map[string]string
ClientID string
ReadTimeout time.Duration
WriteTimeout time.Duration
}
Besu struct {
RPCURL string
RequestTimeout time.Duration
}
ShutdownTimeout time.Duration
}
func Load() (*Config, error) {
cfg := &Config{
Kafka: KafkaConfig{
Brokers: getEnvSlice("KAFKA_BROKERS", []string{"localhost:9092"}),
GroupID: getEnv("KAFKA_GROUP_ID", "besu-connector"),
CommandTopics: getEnvSlice("KAFKA_COMMAND_TOPICS", []string{}),
EventTopic: getEnv("KAFKA_EVENT_TOPIC", "besu.events"),
EventTopics: parseEventTopics(getEnv("KAFKA_EVENT_TOPICS", "")),
ClientID: getEnv("KAFKA_CLIENT_ID", "besu-connector"),
ReadTimeout: getDuration("KAFKA_READ_TIMEOUT", 10*time.Second),
WriteTimeout: getDuration("KAFKA_WRITE_TIMEOUT", 10*time.Second),
},
Besu: BesuConfig{
RPCURL: getEnv("BESU_RPC_URL", "http://localhost:8545"),
RequestTimeout: getDuration("REQUEST_TIMEOUT", 10*time.Second),
},
ShutdownTimeout: getDuration("SHUTDOWN_TIMEOUT", 10*time.Second),
}
return cfg, nil
}
func getEnv(key, defaultValue string) string {
if value := os.Getenv(key); value != "" {
return value
}
return defaultValue
}
func getEnvSlice(key string, defaultValue []string) []string {
if value := os.Getenv(key); value != "" {
return strings.Split(value, ",")
}
return defaultValue
}
func getDuration(key string, defaultValue time.Duration) time.Duration {
if value := os.Getenv(key); value != "" {
if duration, err := time.ParseDuration(value); err == nil {
return duration
}
}
return defaultValue
}
func parseEventTopics(s string) map[string]string {
topics := make(map[string]string)
if s == "" {
return topics
}
pairs := strings.Split(s, ",")
for _, pair := range pairs {
parts := strings.Split(pair, "=")
if len(parts) == 2 {
topics[parts[0]] = parts[1]
}
}
return topics
}Kafka Consumer
Consumer
// internal/kafka/consumer.go
package kafka
import (
"context"
"log"
"time"
"github.com/IBM/sarama"
"github.com/maniton/besu-connector/internal/config"
"github.com/maniton/besu-connector/internal/encoding"
"github.com/maniton/besu-connector/internal/handlers"
)
type Consumer struct {
consumer sarama.ConsumerGroup
cfg *config.Config
}
func NewConsumer(cfg *config.Config) (*Consumer, error) {
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
consumerGroup, err := sarama.NewConsumerGroup(
cfg.Kafka.Brokers,
cfg.Kafka.GroupID,
sarama.NewBalancer(),
config,
)
if err != nil {
return nil, err
}
return &Consumer{
consumer: consumerGroup,
cfg: cfg,
}, nil
}
func (c *Consumer) Start(ctx context.Context) error {
topics := c.cfg.Kafka.CommandTopics
handler := &MessageHandler{
cfg: c.cfg,
}
consumeCtx, cancel := context.WithCancel(context.Background())
go func() {
for {
if err := c.consumer.Consume(consumeCtx, topics, handler); err != nil {
log.Printf("Consumer error: %v", err)
time.Sleep(5 * time.Second)
}
}
}()
<-ctx.Done()
cancel()
return c.consumer.Close()
}
func (c *Consumer) Close() error {
return c.consumer.Close()
}
type MessageHandler struct {
cfg *config.Config
}
func (h *MessageHandler) Setup(sarama.ConsumerGroupSession) error { return nil }
func (h *MessageHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil }
func (h *MessageHandler) ConsumeClaim(sarama.ConsumerGroupSession, sarama.ConsumerGroupClaim) error {
return nil
}
func (h *MessageHandler) Consume(session sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) error {
// Декодирование сообщения
envelope, err := encoding.DecodeEnvelope(msg.Value)
if err != nil {
log.Printf("Failed to decode envelope: %v", err)
return nil
}
// Обработка команды
if err := h.handleCommand(envelope); err != nil {
log.Printf("Failed to handle command: %v", err)
return nil
}
return nil
}
func (h *MessageHandler) handleCommand(envelope *encoding.EventEnvelope) error {
// Определение типа команды
switch envelope.Event.Type {
case "maniton.besu.v1.SubmitTransactionCommand":
return handlers.SubmitTransaction(h.cfg, envelope)
case "maniton.besu.v1.GetTransactionReceiptCommand":
return handlers.GetTransactionReceipt(h.cfg, envelope)
case "maniton.besu.v1.GetBlockCommand":
return handlers.GetBlock(h.cfg, envelope)
case "maniton.besu.v1.VerifyBlockHashCommand":
return handlers.VerifyBlockHash(h.cfg, envelope)
default:
log.Printf("Unknown command type: %s", envelope.Event.Type)
return nil
}
}Kafka Producer
Producer
// internal/kafka/producer.go
package kafka
import (
"log"
"github.com/IBM/sarama"
"github.com/maniton/besu-connector/internal/config"
"github.com/maniton/besu-connector/internal/encoding"
)
type Producer struct {
producer sarama.SyncProducer
cfg *config.Config
}
func NewProducer(cfg *config.Config) (*Producer, error) {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 5
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
producer, err := sarama.NewSyncProducer(cfg.Kafka.Brokers, config)
if err != nil {
return nil, err
}
return &Producer{
producer: producer,
cfg: cfg,
}, nil
}
func (p *Producer) Publish(topic string, envelope *encoding.EventEnvelope) error {
// Кодирование сообщения
bytes, err := encoding.EncodeEnvelope(envelope)
if err != nil {
return err
}
// Определение topic
eventTopic := p.getEventTopic(envelope.Event.Type)
// Отправка сообщения
msg := &sarama.ProducerMessage{
Topic: eventTopic,
Value: sarama.ByteEncoder(bytes),
}
partition, offset, err := p.producer.SendMessage(msg)
if err != nil {
log.Printf("Failed to send message: %v", err)
return err
}
log.Printf("Message sent to topic %s, partition %d, offset %d", topic, partition, offset)
return nil
}
func (p *Producer) getEventTopic(eventType string) string {
// Проверка карты типов событий в топики
if topic, ok := p.cfg.Kafka.EventTopics[eventType]; ok {
return topic
}
// Использование дефолтного топика
return p.cfg.Kafka.EventTopic
}
func (p *Producer) Close() error {
return p.producer.Close()
}Обработчики команд
SubmitTransaction
// internal/handlers/submit_transaction.go
package handlers
import (
"context"
"log"
"time"
"github.com/maniton/besu-connector/internal/besu"
"github.com/maniton/besu-connector/internal/config"
"github.com/maniton/besu-connector/internal/encoding"
"github.com/maniton/besu-connector/internal/kafka"
)
func SubmitTransaction(cfg *config.Config, envelope *encoding.EventEnvelope) error {
// Декодирование команды
cmd, err := encoding.DecodeSubmitTransactionCommand(envelope)
if err != nil {
log.Printf("Failed to decode command: %v", err)
return err
}
// Создание клиента Besu
client, err := besu.NewClient(cfg.Besu.RPCURL, cfg.Besu.RequestTimeout)
if err != nil {
log.Printf("Failed to create Besu client: %v", err)
return err
}
// Отправка транзакции
ctx, cancel := context.WithTimeout(context.Background(), cfg.Besu.RequestTimeout)
defer cancel()
txHash, err := client.SendRawTransaction(ctx, cmd.Payload.Data)
if err != nil {
log.Printf("Failed to send transaction: %v", err)
// Публикация события об ошибке
event := encoding.NewTransactionFailedEvent(
envelope.Context,
cmd.Payload.Data,
err.Error(),
)
publishEvent(cfg, event)
return err
}
// Публикация события об успехе
event := encoding.NewTransactionSubmittedEvent(
envelope.Context,
txHash,
)
return publishEvent(cfg, event)
}
func publishEvent(cfg *config.Config, event *encoding.EventEnvelope) error {
producer, err := kafka.NewProducer(cfg)
if err != nil {
return err
}
defer producer.Close()
return producer.Publish(event.Event.Type, event)
}GetTransactionReceipt
// internal/handlers/get_transaction_receipt.go
package handlers
import (
"context"
"log"
"github.com/maniton/besu-connector/internal/besu"
"github.com/maniton/besu-connector/internal/config"
"github.com/maniton/besu-connector/internal/encoding"
)
func GetTransactionReceipt(cfg *config.Config, envelope *encoding.EventEnvelope) error {
// Декодирование команды
cmd, err := encoding.DecodeGetTransactionReceiptCommand(envelope)
if err != nil {
log.Printf("Failed to decode command: %v", err)
return err
}
// Создание клиента Besu
client, err := besu.NewClient(cfg.Besu.RPCURL, cfg.Besu.RequestTimeout)
if err != nil {
log.Printf("Failed to create Besu client: %v", err)
return err
}
// Получение receipt
ctx, cancel := context.WithTimeout(context.Background(), cfg.Besu.RequestTimeout)
defer cancel()
receipt, err := client.GetTransactionReceipt(ctx, cmd.TxHash)
if err != nil {
log.Printf("Failed to get transaction receipt: %v", err)
return err
}
// Публикация события
event := encoding.NewTransactionReceiptReadyEvent(
envelope.Context,
cmd.TxHash,
receipt,
)
return publishEvent(cfg, event)
}GetBlock
// internal/handlers/get_block.go
package handlers
import (
"context"
"log"
"github.com/maniton/besu-connector/internal/besu"
"github.com/maniton/besu-connector/internal/config"
"github.com/maniton/besu-connector/internal/encoding"
)
func GetBlock(cfg *config.Config, envelope *encoding.EventEnvelope) error {
// Декодирование команды
cmd, err := encoding.DecodeGetBlockCommand(envelope)
if err != nil {
log.Printf("Failed to decode command: %v", err)
return err
}
// Создание клиента Besu
client, err := besu.NewClient(cfg.Besu.RPCURL, cfg.Besu.RequestTimeout)
if err != nil {
log.Printf("Failed to create Besu client: %v", err)
return err
}
// Получение блока
ctx, cancel := context.WithTimeout(context.Background(), cfg.Besu.RequestTimeout)
defer cancel()
block, err := client.GetBlock(ctx, cmd.BlockNumber)
if err != nil {
log.Printf("Failed to get block: %v", err)
return err
}
// Публикация события
event := encoding.NewBlockReadEvent(
envelope.Context,
block,
)
return publishEvent(cfg, event)
}Besu Client
Client
// internal/besu/client.go
package besu
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
)
type Client struct {
rpcURL string
client *http.Client
}
type RPCRequest struct {
Jsonrpc string `json:"jsonrpc"`
Method string `json:"method"`
Params []any `json:"params"`
ID int `json:"id"`
}
type RPCResponse struct {
Jsonrpc string `json:"jsonrpc"`
ID int `json:"id"`
Result any `json:"result,omitempty"`
Error *struct {
Code int `json:"code"`
Message string `json:"message"`
} `json:"error,omitempty"`
}
func NewClient(rpcURL string, timeout time.Duration) (*Client, error) {
return &Client{
rpcURL: rpcURL,
client: &http.Client{
Timeout: timeout,
},
}, nil
}
func (c *Client) SendRawTransaction(ctx context.Context, data string) (string, error) {
req := RPCRequest{
Jsonrpc: "2.0",
Method: "eth_sendRawTransaction",
Params: []any{data},
ID: 1,
}
resp, err := c.doRequest(ctx, req)
if err != nil {
return "", err
}
if resp.Error != nil {
return "", fmt.Errorf("RPC error: %s", resp.Error.Message)
}
txHash, ok := resp.Result.(string)
if !ok {
return "", fmt.Errorf("invalid response type")
}
return txHash, nil
}
func (c *Client) GetTransactionReceipt(ctx context.Context, txHash string) (any, error) {
req := RPCRequest{
Jsonrpc: "2.0",
Method: "eth_getTransactionReceipt",
Params: []any{txHash},
ID: 1,
}
resp, err := c.doRequest(ctx, req)
if err != nil {
return nil, err
}
if resp.Error != nil {
return nil, fmt.Errorf("RPC error: %s", resp.Error.Message)
}
return resp.Result, nil
}
func (c *Client) GetBlock(ctx context.Context, blockNumber int) (any, error) {
req := RPCRequest{
Jsonrpc: "2.0",
Method: "eth_getBlockByNumber",
Params: []any{fmt.Sprintf("0x%x", blockNumber)},
ID: 1,
}
resp, err := c.doRequest(ctx, req)
if err != nil {
return nil, err
}
if resp.Error != nil {
return nil, fmt.Errorf("RPC error: %s", resp.Error.Message)
}
return resp.Result, nil
}
func (c *Client) doRequest(ctx context.Context, req RPCRequest) (*RPCResponse, error) {
body, err := json.Marshal(req)
if err != nil {
return nil, err
}
httpReq, err := http.NewRequestWithContext(ctx, "POST", c.rpcURL, bytes.NewReader(body))
if err != nil {
return nil, err
}
httpReq.Header.Set("Content-Type", "application/json")
resp, err := c.client.Do(httpReq)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var rpcResp RPCResponse
if err := json.NewDecoder(resp.Body).Decode(&rpcResp); err != nil {
return nil, err
}
return &rpcResp, nil
}Кодирование
Envelope
// internal/encoding/envelope.go
package encoding
import (
"encoding/json"
"fmt"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes/any"
"github.com/maniton/contracts/gen/go/maniton/events/v1"
"github.com/maniton/contracts/gen/go/maniton/besu/v1"
"github.com/maniton/contracts/gen/go/maniton/common/v1"
)
type EventEnvelope struct {
Event *events.EventEnvelope
Context *common.RequestContext
}
func DecodeEnvelope(data []byte) (*EventEnvelope, error) {
var envelope events.EventEnvelope
if err := proto.Unmarshal(data, &envelope); err != nil {
return nil, fmt.Errorf("failed to unmarshal envelope: %w", err)
}
return &EventEnvelope{
Event: &envelope,
Context: envelope.Context,
}, nil
}
func EncodeEnvelope(envelope *EventEnvelope) ([]byte, error) {
env := &events.EventEnvelope{
Event: envelope.Event,
Context: envelope.Context,
}
return proto.Marshal(env)
}
func DecodeSubmitTransactionCommand(envelope *EventEnvelope) (*besu.SubmitTransactionCommand, error) {
any := envelope.Event.Data
if any == nil {
return nil, fmt.Errorf("no data in envelope")
}
cmd := &besu.SubmitTransactionCommand{}
if err := any.UnmarshalTo(cmd); err != nil {
return nil, fmt.Errorf("failed to unmarshal command: %w", err)
}
return cmd, nil
}
func NewTransactionSubmittedEvent(context *common.RequestContext, txHash string) *EventEnvelope {
cmd := &besu.TransactionSubmittedEvent{
Context: context,
TxHash: txHash,
}
any, err := any.New(cmd, proto.MarshalOptions{})
if err != nil {
return nil, fmt.Errorf("failed to create Any: %w", err)
}
event := &events.CloudEvent{
Id: uuid.New().String(),
Source: "besu-connector",
SpecVersion: "1.0",
Type: "maniton.besu.v1.TransactionSubmittedEvent",
DataContentType: "application/protobuf",
Data: any,
}
return &EventEnvelope{
Event: event,
Context: context,
}
}
func NewTransactionFailedEvent(context *common.RequestContext, data string, reason string) *EventEnvelope {
cmd := &besu.TransactionFailedEvent{
Context: context,
Data: data,
Reason: reason,
}
any, err := any.New(cmd, proto.MarshalOptions{})
if err != nil {
return nil, fmt.Errorf("failed to create Any: %w", err)
}
event := &events.CloudEvent{
Id: uuid.New().String(),
Source: "besu-connector",
SpecVersion: "1.0",
Type: "maniton.besu.v1.TransactionFailedEvent",
DataContentType: "application/protobuf",
Data: any,
}
return &EventEnvelope{
Event: event,
Context: context,
}
}
func NewTransactionReceiptReadyEvent(context *common.RequestContext, txHash string, receipt any) *EventEnvelope {
cmd := &besu.TransactionReceiptReadyEvent{
Context: context,
TxHash: txHash,
Receipt: receipt,
}
any, err := any.New(cmd, proto.MarshalOptions{})
if err != nil {
return nil, fmt.Errorf("failed to create Any: %w", err)
}
event := &events.CloudEvent{
Id: uuid.New().String(),
Source: "besu-connector",
SpecVersion: "1.0",
Type: "maniton.besu.v1.TransactionReceiptReadyEvent",
DataContentType: "application/protobuf",
Data: any,
}
return &EventEnvelope{
Event: event,
Context: context,
}
}
func NewBlockReadEvent(context *common.RequestContext, block any) *EventEnvelope {
cmd := &besu.BlockReadEvent{
Context: context,
Block: block,
}
any, err := any.New(cmd, proto.MarshalOptions{})
if err != nil {
return nil, fmt.Errorf("failed to create Any: %w", err)
}
event := &events.CloudEvent{
Id: uuid.New().String(),
Source: "besu-connector",
SpecVersion: "1.0",
Type: "maniton.besu.v1.BlockReadEvent",
DataContentType: "application/protobuf",
Data: any,
}
return &EventEnvelope{
Event: event,
Context: context,
}
}Тестирование
Unit Tests
// internal/besu/client_test.go
package besu
import (
"context"
"testing"
"time"
)
func TestClient_SendRawTransaction(t *testing.T) {
client, err := NewClient("http://localhost:8545", 10*time.Second)
if err != nil {
t.Fatalf("Failed to create client: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
txHash, err := client.SendRawTransaction(ctx, "0x02f8...")
if err != nil {
t.Fatalf("Failed to send transaction: %v", err)
}
if txHash == "" {
t.Fatal("Empty transaction hash")
}
}Integration Tests
// internal/integration_test.go
package integration
import (
"testing"
"time"
"github.com/maniton/besu-connector/internal/config"
"github.com/maniton/besu-connector/internal/kafka"
)
func TestIntegration(t *testing.T) {
cfg := &config.Config{
Kafka: config.KafkaConfig{
Brokers: []string{"localhost:9092"},
GroupID: "test-group",
CommandTopics: []string{"besu.command.getBlock"},
EventTopic: "besu.events",
},
Besu: config.BesuConfig{
RPCURL: "http://localhost:8545",
RequestTimeout: 10 * time.Second,
},
}
consumer, err := kafka.NewConsumer(cfg)
if err != nil {
t.Fatalf("Failed to create consumer: %v", err)
}
producer, err := kafka.NewProducer(cfg)
if err != nil {
t.Fatalf("Failed to create producer: %v", err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
if err := consumer.Start(ctx); err != nil {
t.Logf("Consumer error: %v", err)
}
}()
// Ожидание обработки сообщений
time.Sleep(5 * time.Second)
cancel()
}Мониторинг
Метрики
// internal/metrics/metrics.go
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
commandsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "besu_commands_total",
Help: "Total number of Besu commands",
}, []string{"command", "status"})
commandDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "besu_command_duration_seconds",
Help: "Time spent on Besu commands",
}, []string{"command"})
eventsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "besu_events_total",
Help: "Total number of Besu events",
}, []string{"event"})
)
kafkaErrorsTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "kafka_errors_total",
Help: "Total number of Kafka errors",
})
)
func RecordCommand(command string, status string) {
commandsTotal.WithLabelValues(command, status).Inc()
}
func RecordCommandDuration(command string, duration float64) {
commandDuration.WithLabelValues(command).Observe(duration)
}
func RecordEvent(event string) {
eventsTotal.WithLabelValues(event).Inc()
}
func RecordKafkaError() {
kafkaErrorsTotal.Inc()
}Логи
// internal/logging/logger.go
package logging
import (
"log"
"os"
)
type Logger struct {
prefix string
}
func NewLogger(prefix string) *Logger {
return &Logger{
prefix: prefix,
}
}
func (l *Logger) Info(message string, args ...interface{}) {
log.Printf("[INFO] [%s] %s", l.prefix, fmt.Sprintf(message, args...))
}
func (l *Logger) Error(message string, err error, args ...interface{}) {
if err != nil {
log.Printf("[ERROR] [%s] %s: %v", l.prefix, fmt.Sprintf(message, args...), err)
} else {
log.Printf("[ERROR] [%s] %s", l.prefix, fmt.Sprintf(message, args...))
}
}
func (l *Logger) Warn(message string, args ...interface{}) {
log.Printf("[WARN] [%s] %s", l.prefix, fmt.Sprintf(message, args...))
}
func (l *Logger) Debug(message string, args ...interface{}) {
if os.Getenv("LOG_LEVEL") == "debug" {
log.Printf("[DEBUG] [%s] %s", l.prefix, fmt.Sprintf(message, args...))
}
}Troubleshooting
Проблема: Kafka consumer не подключается
Решение:
- Проверьте, что Kafka брокеры доступны
- Проверьте, что group ID уникален
- Проверьте логи consumer
Проблема: Besu RPC не отвечает
Решение:
- Проверьте, что RPC URL правильный
- Проверьте, что Besu запущен
- Проверьте таймаут запроса
Проблема: События не публикуются
Решение:
- Проверьте, что producer создан
- Проверьте, что topic существует
- Проверьте логи producer