EasyKafka:让消息收发更简单
2025-05-30 08:25 阅读(15)

一、背景与现状

目前,我们团队在使用 Kafka 时主要存在以下几类问题:


一个工程对接多套 Kafka 集群,工程代码缺少对多套集群的规范管理;

生产环境分为基线和灰度,对接灰度 Kafka 时,工程改动繁琐;

生产/消费消息时,出现大量偏技术性的重复代码:序列化、反序列化、判空、重试等;

在发送消息、消费消息过程中,存在链路丢标的情况;


因此,建设统一消息组件,以解决上述问题,简化消息收发。


1.1 发送消息现状

生产环境有两套:生产基线环境和生产灰度环境。

1、基线生产者

@Service
public class KafkaProducerServiceImpl {
    
    // [问题]: 配置命名没有规范
    @Value("${kafka.servers}")
    private String kafkaServers;
​
    private Producer<String, String> producer;
    
    @PostConstruct
    public void init() {
        // [问题]: 重复性代码
        Properties props = new Properties();
        props.put("bootstrap.servers", kafkaServers);
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("retries", 0);
        props.put("linger.ms", 5);
        props.put("batch.size", 16384);
        props.put("buffer.memory", 33554432);
        producer = new KafkaProducer<>(props);
    }
    
    public void sendMessage(String topic, String key, String value) {
        ProducerRecord<String,String> producerRecord = new ProducerRecord<>(topic, null, key, value, null);
        Future<RecordMetadata> sendFuture = producer.send(producerRecord);
        sendFuture.get();
    }
}


2、灰度生产者

public class GrayKafkaProducerServiceImpl {
    
    // [问题]: 配置命名没有规范
    @Value("${gray.kafka.servers}")
    private String grayKafkaServers;
    
    @PostConstruct
    public void init() {
        // [问题]: 重复性代码
        Properties props = new Properties();
        // 【只有这里不同,其他地方同 KafkaProducerServiceImpl#init()】
        props.put("bootstrap.servers", grayKafkaServers);
        // 省略相同代码 ...
    }
}


3、发送消息

public void send(Message message) {
    // [问题]: 重复性代码-序列化
    String value = JSON.toJSONString(message);
    // [问题]: 灰度改动繁琐
    if (GrayUtils.isGray()) {
        grayKafkaProducerService.sendMessage("order.topic", message.getOrderId(), value);
    } else {
        kafkaProducerServiceImpl.sendMessage("order.topic", message.getOrderId(), value);
    }
}


2.2 消费消息现状

1、配置消费者


@Configuration
@EnableKafka
public class ConsumerConfig {
​
    // [问题]: 配置命名没有规范
    @Value("${kafka.servers}")
    private String kafkaServers;
​
    @Value("${gray.kafka.servers}")
    private String grayKafkaServers;
​
    @Bean
    public DefaultKafkaConsumerFactory<String, String> consumerFactory() {
        // [问题]: 重复性代码
        HashMap<String, Object> configs = new HashMap<>();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG,"example-consumer-group");
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 480000);
        configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
        return new DefaultKafkaConsumerFactory<>(configs);
    }
​
    @Bean
    public DefaultKafkaConsumerFactory<String,String> grayConsumerFactory() {
        // 同上方法 consumerFactory()
    }
}


2、注册监听


@Bean
public ConcurrentMessageListenerContainer concurrentMessageListenerContainer(MessageListener exampleMessageListener) {
    ContainerProperties containerProperties = new ContainerProperties("order.topic");
    containerProperties.setMessageListener(exampleMessageListener);
    
    ConcurrentMessageListenerContainer<String, String> listenerContainer = new ConcurrentMessageListenerContainer<>(
            // // [问题]: 灰度改动繁琐(根据环境选择初始化, 灰度 或者 基线)
            GrayUtils.isGray() ? grayConsumerFactory() : consumerFactory(),
            containerProperties);
    listenerContainer.setConcurrency(15);
    return listenerContainer;
}


3、逻辑处理


@Service
public class ExampleMessageListener extends MessageListener<String, String> {
    @Override
    public void onMessage(ConsumerRecord<String, String> record) {
        // [问题]: 重复性代码-反序列化
        Message message = JSON.parseObject(record.value(), Message.class);
        try {
            doProcess(message);
        } catch (CustomException e) {
            // 自定义异常, 流程结束, 不再重试
            log.error(e.getMessage, e);
        } catch (Throwable ex) {
            // 其它异常, 流程结束, 等待重试
            doRetry(message);
        }
    }
}


二、EasyKafka - 快速开始

EasyKafka 是基于 Spring Kafka 的增强,原 Spring Kafka 所有功能完全适配。

EasyKafka 的建设目标:使消息的发送和接收更简单。项目地址:github.com/studeyang/e…

主要解决以下问题:


简化消息发送和接收流程

封装基线与灰度环境兼容的复杂度

统一消息重试


2.1 发送消息

1、引入依赖

<dependency>
    <groupId>io.github.studeyang</groupId>
    <artifactId>easykafka-spring-boot-starter</artifactId>
</dependency>


2、配置


最少启动配置

easykafka:
  init: #初始化
    kafkaCluster: #kafka集群容器
    - cluster: send
      brokers: send-kafka.domain.com:9092
    - cluster: send
      brokers: send-gray-kafka.domain.com:9092
      tag: gray
    - cluster: edms
      brokers: edms-kafka.domain.com:9092


3、定义一条消息

import io.github.open.easykafka.client.annotation.Topic;
import io.github.open.easykafka.client.message.Event;

@Topic(cluster = "send", name = "easykafka-example-topic")
public class ExampleEvent extends Event {
    private String name;
}


4、发送消息

import io.github.open.easykafka.client.EventPublisher;

@RunWith(SpringRunner.class)
@SpringBootTest
public class EventPublisherTest {
    @Test
    public void sendExampleEvent() {
        ExampleEvent event = new ExampleEvent();
        event.setName("test");
        EventPublisher.publish(event);
    }
}


2.2 消费消息

1、引入依赖


引入 EasyKafka 和 消息定义包的依赖。


<!-- easykafka -->
<dependency>
    <groupId>io.github.studeyang</groupId>
    <artifactId>easykafka-spring-boot-starter</artifactId>
</dependency>
 
<!-- 消息定义 -->
<dependency>
    <groupId>io.github.studeyang</groupId>
    <artifactId>es-send-basic-api</artifactId>
</dependency>


2、实现消费逻辑

import io.github.open.easykafka.client.annotation.EventHandler;
import org.springframework.stereotype.Service;

@Service
public class SingleEventHandler {
    @EventHandler
    public void handle(ExampleEvent event) {
        System.out.println("收到了一条消息: " + event);
    }
}


三、EasyKafka - 进阶使用

3.1 统一配置

1、初始化配置

easykafka:
  #初始化
  init:
    kafkaCluster: #kafka集群
    - cluster: send #会生成一个 sendProducer 的 SpringBean
      brokers: send-kafka.domain.com:9092
      tag: BASE  #默认BASE, 可选:BASE,GRAY
    - cluster: send #会生成一个 sendGrayProducer 的 SpringBean
      brokers: send-gray-kafka.domain.com:9092
      tag: GRAY
    - cluster: edms
      brokers: edms-kafka.domain.com:9092
    producer: #生产者
      #可选
      - beanName: sendProducer #选择对应的 SpringBean
        config: #kafka producer扩展配置, Map
          acks: 1        ### 覆盖原配置 ack=all
          linger.ms: 10  ### 覆盖原配置 linger.ms=5
      - beanName: sendGrayProducer
        config:
    consumer: #消费者
      #可选
      - beanName: sendConsumer
        config: #kafka consumer扩展配置, Map
          enable.auto.commit: false  ### 覆盖原配置 enable.auto.commit=true
          max.poll.records: 50       ### 覆盖原配置 max.poll.records=30
      - beanName: sendGrayConsumer
        config:


2、运行时配置

easykafka:
  #运行时
  runtime:
    producer: #生产者
      #可选
      partitionSize: 500  #批量发送消息时, 每批次消息数
      async:
        corePoolSize: 3       #核心线程数
        maxPoolSize: 5        #最大线程数
        keepAliveSeconds: 60  #线程保持时间
        queueCapacity: 100    #线程队列大小
        rejectedHandler: java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy  #线程满时,拒绝策略
        threadNamePrefix: kafka-async-producer-  #线程名


3.2 生产者

1、初始化Kafka

easykafka:
  #初始化
  init:
    kafkaCluster: #kafka集群
    - cluster: send
      brokers: send-kafka.domain.com:9092
      tag: base  #可选 base,gray
    - cluster: send
      brokers: send-gray-kafka.domain.com:9092
      tag: gray
    - cluster: edms
      brokers: edms-kafka.domain.com:9092
      tag: base  #默认base


当初始化 Kafka 配置如上时,程序启动后会产生一个生产者容器 ProducerContainer。



容器内容通过日志打印出来,如下:

[EasyKafka] ProducerContainer:
{"bean":"postGrayProducer","cluster":"post","tag":"BASE"}
{"bean":"edmsGrayProducer","cluster":"edms","tag":"BASE"}


如果当前运行的是灰度环境(ENV_TAG=gray),容器包含内容如下::

[EasyKafka] ProducerContainer:
{"bean":"postGrayProducer","cluster":"post","tag":"GRAY"}
{"bean":"edmsGrayProducer","cluster":"edms","tag":"GRAY"}


2、消息发送流程

定义好消息后:

@Topic(cluster = "send", name = "easykafka-example-topic")
public class ExampleEvent extends Event {
    private String name;
}


消息发送时,会根据消息定义的 @Topic(cluster) 值,从生产者容器 ProducerContainer 选择合适的 Producer 发送消息。


如果当前运行的是灰度环境(ENV_TAG=gray),在程序启动后,生产者容器内容也会发生相应的变化。

3、扩展 producer 配置

例如我想改 producer 的配置,acks=1; linger.ms=10,可以通过 config 来扩展或覆盖配置。

easykafka:
  #初始化
  init:
    producer: #生产者
      #可选
      - beanName: sendProducer #选择对应的 SpringBean
        config: #kafka producer扩展配置, Map
          acks: 1        ### 覆盖原配置 ack=all
          linger.ms: 10  ### 覆盖原配置 linger.ms=5
      - beanName: sendGrayProducer
        config:


4、使用原生 producer

也可以用原生的 kafka producer 发送消息,但这种方式不推荐使用。

@Service
public class DemoServiceImpl {
    @Autowired
    @Qualifier("sendProducer")
    private Producer<String, String> sendProducer;
}


3.3 消费者

1、初始化监听器

EasyKafka 会扫描 @EventHandler 注解,以获得所有的监听器。

@Service
public class SingleEventHandler {

    @EventHandler
    public void handle(ExampleEvent event) {
        System.out.println("SingleEventHandler 收到消息: " + event);
    }

    @EventHandler(concurrency = "15", groupId = "groupId-example-consumer")
    public void handle(Example2Event event) {
        System.out.println("SingleEventHandler 收到消息: " + event);
    }
}


@EventHandler 的默认配置:


cluster 默认从 ExampleEvent 的 @Topic 注解中获取

topics 默认从 ExampleEvent 的 @Topic 注解中获取

groupId 默认取 spring.application.name

containerFactory 默认取 {cluster} + "KafkaListenerContainerFactory"


所有的监听器都会放到监听器容器 ListenerContainer 中。在代码扫描后,监听器容器中的内容通过控制台日志打印,如下:

[EasyKafka] ListenerContainer:
{"cluster":"send","event":"io.github.open.easykafka.event.ExampleEvent","groupId":"example-consumer","topics":"easykafka-example-topic"}
{"cluster":"send","event":"io.github.open.easykafka.event.Example2Event","groupId":"groupId-example-consumer","topics":"easykafka-example-topic"}


2、消息消费流程


消息一旦消费失败,会使用 Spring Kafka 的默认错误处理 SeekToCurrentErrorHandler 进行消息重试,最大重试次数= 9 次。

3、扩展 consumer 配置

例如我想改 consumer 的配置,enable.auto.commit=false; max.poll.records=50,可以通过 config 来扩展或覆盖配置。

easykafka:
  #初始化
  init:
    consumer: #消费者
      #可选
      - beanName: sendConsumer
        config: #kafka consumer扩展配置, Map
          enable.auto.commit: false  ### 覆盖原配置 enable.auto.commit=true
          max.poll.records: 50       ### 覆盖原配置 max.poll.records=30
      - beanName: sendGrayConsumer
        config:


也可以使用 @EventHandler 的 properties 属性来配置,该属性与 @KafkaListener 的 properties 用法和功能完全一样,例如:

@Service
public class SingleEventHandler {
​
    @EventHandler(properties = {"enable.auto.commit=false", "max.poll.records=50"})
    public void handle(ExampleEvent event) {
        System.out.println("SingleEventHandler 收到消息: " + event);
    }
}