Spring boot 1.5.1
一下安装仅仅适合开发环境,生产环境请使用这个脚本安装 https://github.com/oscm/shell/tree/master/mq/kafka
cd /usr/local/src wget http://apache.communilink.net/kafka/0.10.2.0/kafka_2.12-0.10.2.0.tgz tar zxvf kafka_2.12-0.10.2.0.tgz mv kafka_2.12-0.10.2.0 /srv/ cp /srv/kafka_2.12-0.10.2.0/config/server.properties{,.original} echo "advertised.host.name=localhost" >> /srv/kafka_2.12-0.10.2.0/config/server.properties ln -s /srv/kafka_2.12-0.10.2.0 /srv/kafka
启动 Kafka 服务
/srv/kafka/bin/zookeeper-server-start.sh config/zookeeper.properties /srv/kafka/bin/kafka-server-start.sh /srv/kafka/config/server.properties
kafka from-beginning。-daemon 表示守护进程方式在后台启动
/srv/kafka/bin/zookeeper-server-start.sh -daemon config/zookeeper.properties /srv/kafka/bin/kafka-server-start.sh -daemon /srv/kafka/config/server.properties
停止 Kafka 服务
/srv/kafka/bin/kafka-server-stop.sh /srv/kafka/bin/zookeeper-server-stop.sh
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.netkiller</groupId><artifactId>deploy</artifactId><version>0.0.1-SNAPSHOT</version><packaging>war</packaging><name>deploy.netkiller.cn</name><description>Deploy project for Spring Boot</description><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>1.5.1.RELEASE</version><relativePath /> <!-- lookup parent from repository --></parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version></properties><dependencies><!-- <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> --><!-- <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> --><!-- <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-mongodb</artifactId> </dependency> --><!-- <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.springframework.session</groupId><artifactId>spring-session-data-redis</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-cache</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-security</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency><dependency><groupId>org.webjars</groupId><artifactId>webjars-locator</artifactId></dependency><dependency><groupId>org.webjars</groupId><artifactId>sockjs-client</artifactId><version>1.0.2</version></dependency><dependency><groupId>org.webjars</groupId><artifactId>stomp-websocket</artifactId><version>2.3.3</version></dependency><dependency><groupId>org.webjars</groupId><artifactId>bootstrap</artifactId><version>3.3.7</version></dependency><dependency><groupId>org.webjars</groupId><artifactId>jquery</artifactId><version>3.1.0</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-mail</artifactId></dependency><dependency><groupId>org.apache.tomcat.embed</groupId><artifactId>tomcat-embed-jasper</artifactId><scope>provided</scope></dependency><dependency><groupId>javax.servlet</groupId><artifactId>jstl</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><!-- <version>2.7</version> --></dependency><dependency><groupId>com.caucho</groupId><artifactId>hessian</artifactId><version>4.0.38</version></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><mainClass>cn.netkiller.Application</mainClass></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><configuration><skip>true</skip></configuration></plugin></plugins></build><repositories><repository><id>spring-snapshots</id><name>Spring Snapshots</name><url>https://repo.spring.io/snapshot</url><snapshots><enabled>true</enabled></snapshots></repository><repository><id>spring-milestones</id><name>Spring Milestones</name><url>https://repo.spring.io/milestone</url><snapshots><enabled>false</enabled></snapshots></repository></repositories><pluginRepositories><pluginRepository><id>spring-snapshots</id><name>Spring Snapshots</name><url>https://repo.spring.io/snapshot</url><snapshots><enabled>true</enabled></snapshots></pluginRepository><pluginRepository><id>spring-milestones</id><name>Spring Milestones</name><url>https://repo.spring.io/milestone</url><snapshots><enabled>false</enabled></snapshots></pluginRepository></pluginRepositories></project>
package cn.netkiller;import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.ComponentScan; import org.springframework.scheduling.annotation.EnableScheduling;@SpringBootApplication @EnableAutoConfiguration @ComponentScan @EnableScheduling public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);} }
package cn.netkiller.kafka;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import java.util.HashMap; import java.util.Map;@Configuration @EnableKafka public class KafkaConsumerConfig {public KafkaConsumerConfig() {// TODO Auto-generated constructor stub}@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();factory.setConsumerFactory(consumerFactory());// factory.setConcurrency(1);// factory.getContainerProperties().setPollTimeout(3000);return factory;}@Beanpublic ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());}@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> propsMap = new HashMap<String, Object>();propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "www.netkiller.cn:9092");propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return propsMap;}@Beanpublic Listener listener() {return new Listener();}}
package cn.netkiller.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import java.util.concurrent.CountDownLatch; import java.util.logging.Logger;public class Listener {public Listener() {// TODO Auto-generated constructor stub}protected Logger logger = Logger.getLogger(Listener.class.getName());public CountDownLatch getCountDownLatch1() {return countDownLatch1;}private CountDownLatch countDownLatch1 = new CountDownLatch(1);@KafkaListener(topics = "test")public void listen(ConsumerRecord<?, ?> record) {logger.info("Received message: " + record.toString());System.out.println("Received message: " + record);countDownLatch1.countDown();} }
$ cd /srv/kafka $ bin/kafka-console-producer.sh --broker-list 47.89.35.55:9092 --topic test This is test message.
每输入一行回车后发送到你的Spring boot kafka 程序
上面的例子仅仅是做了一个热身,现在我们将实现 一个完整的例子。
基于springboot?例 2.5. Spring boot with Apache kafka.
SpringApplication
package cn.netkiller;import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.ComponentScan; //import org.springframework.data.jpa.repository.config.EnableJpaRepositories; //import org.springframework.data.mongodb.repository.config.EnableMongoRepositories; import org.springframework.scheduling.annotation.EnableScheduling;@SpringBootApplication @EnableAutoConfiguration @ComponentScan // @EnableMongoRepositories // @EnableJpaRepositories @EnableScheduling public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);} }
Consumer configuration
package cn.netkiller.kafka.config;import java.util.HashMap; import java.util.Map;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import cn.netkiller.kafka.consumer.Consumer;@Configuration @EnableKafka public class ConsumerConfiguration {public ConsumerConfiguration() {// TODO Auto-generated constructor stub}@Beanpublic Map<String, Object> consumerConfigs() {HashMap<String, Object> props = new HashMap<>();// list of host:port pairs used for establishing the initial connections// to the Kakfa clusterprops.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "www.netkiller.cn:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);// consumer groups allow a pool of processes to divide the work of// consuming and processing recordsprops.put(ConsumerConfig.GROUP_ID_CONFIG, "helloworld");return props;}@Beanpublic ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();factory.setConsumerFactory(consumerFactory());return factory;}@Beanpublic Consumer receiver() {return new Consumer();} }
Consumer
package cn.netkiller.kafka.consumer; import java.util.concurrent.CountDownLatch;import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener;public class Consumer {public Consumer() {// TODO Auto-generated constructor stub}private static final Logger logger = LoggerFactory.getLogger(Consumer.class);private CountDownLatch latch = new CountDownLatch(1);@KafkaListener(topics = "helloworld.t")public void receiveMessage(String message) {logger.info("received message='{}'", message);latch.countDown();}public CountDownLatch getLatch() {return latch;} }
springboot web。例 2.6. Spring boot with Apache kafka.
Producer configuration
package cn.netkiller.kafka.config;import java.util.HashMap; import java.util.Map;import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory;import cn.netkiller.kafka.producer.Producer;@Configuration public class ProducerConfiguration {public ProducerConfiguration() {// TODO Auto-generated constructor stub}@Beanpublic Map<String, Object> producerConfigs() {HashMap<String, Object> props = new HashMap<>();// list of host:port pairs used for establishing the initial connections// to the Kakfa clusterprops.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "www.netkiller.cn:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// value to block, after which it will throw a TimeoutExceptionprops.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);return props;}@Beanpublic ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<String, String>(producerConfigs());}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<String, String>(producerFactory());}@Beanpublic Producer sender() {return new Producer();} }
Producer
package cn.netkiller.kafka.producer;import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback;public class Producer {private static final Logger logger = LoggerFactory.getLogger(Producer.class);/** public Sender() { // TODO Auto-generated constructor stub }*/@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String topic, String message) {// the KafkaTemplate provides asynchronous send methods returning a// FutureListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);// you can register a callback with the listener to receive the result// of the send asynchronouslyfuture.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onSuccess(SendResult<String, String> result) {logger.info("sent message='{}' with offset={}", message, result.getRecordMetadata().offset());}@Overridepublic void onFailure(Throwable ex) {logger.error("unable to send message='{}'", message, ex);}});// alternatively, to block the sending thread, to await the result,// invoke the future’s get() method} }
Controller
package cn.netkiller.web;import java.util.concurrent.TimeUnit;import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody;import cn.netkiller.kafka.consumer.Consumer; import cn.netkiller.kafka.producer.Producer;@Controller @RequestMapping("/test") public class KafkaTestController {private static final Logger logger = LoggerFactory.getLogger(IndexController.class);public KafkaTestController() {// TODO Auto-generated constructor stub}@Autowiredprivate Producer sender;@Autowiredprivate Consumer receiver;@RequestMapping("/ping")@ResponseBodypublic String ping() {String message = "PONG";return message;}@RequestMapping("/kafka/send")@ResponseBodypublic String testReceiver() throws Exception {sender.sendMessage("helloworld.t", "Hello Spring Kafka!");receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);logger.info(receiver.getLatch().getCount() + "");return "OK";}}
springboot启动类。例 2.7. Test Spring Kafka
SpringBootTest
package cn.netkiller; import static org.assertj.core.api.Assertions.assertThat;import java.util.concurrent.TimeUnit;import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;import cn.netkiller.kafka.consumer.Consumer; import cn.netkiller.kafka.producer.Producer;@RunWith(SpringRunner.class) @SpringBootTest public class SpringKafkaApplicationTests {public SpringKafkaApplicationTests() {// TODO Auto-generated constructor stub}@Autowiredprivate Producer sender;@Autowiredprivate Consumer receiver;@Testpublic void testReceiver() throws Exception {sender.sendMessage("helloworld.t", "Hello Spring Kafka!");receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);assertThat(receiver.getLatch().getCount()).isEqualTo(0);} }
package schedule;import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.domain.EntityScan; import org.springframework.cloud.netflix.eureka.EnableEurekaClient; import org.springframework.scheduling.annotation.EnableScheduling;@SpringBootApplication @EnableScheduling @EnableEurekaClient @EntityScan("common.domain") public class Application {public static void main(String[] args) {System.out.println("Service Schedule Starting...");SpringApplication.run(Application.class, args);} }
只需要两行,其余所有配置均放在配置中心。
# ============================== spring.application.name=schedule eureka.client.serviceUrl.defaultZone=http://eureka:s3cr3t@172.16.0.10:8761/eureka/ # ==============================
java使用kafka、配置中心服务器相关配置
#spring.application.name=schedule spring.cloud.config.profile=development spring.cloud.config.label=master spring.cloud.config.uri=http://172.16.0.10:8888 management.security.enabled=false spring.cloud.config.username=cfg spring.cloud.config.password=s3cr3t
使用 @EnableKafka 启用 Kafka 不需要其他@Bean等。这个配置文件可以省略,可以将 @EnableKafka 放到 Application.java 中。我还是喜欢独立配置。
package schedule.config; @Configuration @EnableKafka public class KafkaConfiguration { }
package schedule.task;import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import java.util.List;import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.web.client.RestTemplate;import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper;import schedule.repository.CmsTrashRepository; import schedule.repository.ArticleRepository; import common.domain.Article; import common.domain.CmsTrash; import common.pojo.ResponseRestful;@Component public class CFPushTasks {private static final Logger logger = LoggerFactory.getLogger(CFPushTasks.class);private static final String TOPIC = "test";private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");private static final ObjectMapper mapper = new ObjectMapper();@Autowiredprivate ArticleRepository articleRepository;@Autowiredprivate CmsTrashRepository cmsTrashRepository;@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@Autowiredprivate RedisTemplate<String, String> redisTemplate;@Value("${cf.cms.site_id}")private int siteId;public CFPushTasks() {}private Date getDate() {Calendar calendar = Calendar.getInstance();calendar.add(Calendar.MINUTE, -1);Date date = calendar.getTime();return date;}private boolean setPostionDate(String key, Date value) {String cacheKey = String.format("schedule:CFPushTasks:%s", key);String date = simpleDateFormat.format(value);logger.info("setPostion({},{})", cacheKey, date);redisTemplate.opsForValue().set(cacheKey, date);if (value == this.getPostionDate(cacheKey)) {return true;}return false;}private Date getPostionDate(String key) {String cacheKey = String.format("schedule:CFPushTasks:%s", key);Date date = null;if (redisTemplate.hasKey(cacheKey)) {try {date = simpleDateFormat.parse(redisTemplate.opsForValue().get(cacheKey));} catch (ParseException e) {// TODO Auto-generated catch block// e.printStackTrace();logger.warn(e.getMessage());}}logger.debug("getPostion({}) => {}", cacheKey, date);return date;}private boolean setPostionId(String key, int id) {String cacheKey = String.format("schedule:CFPushTasks:PostionId:%s", key);logger.info("setPostionId({},{})", cacheKey, id);redisTemplate.opsForValue().set(cacheKey, String.valueOf(id));if (id == this.getPostionId(cacheKey)) {return true;}return false;}private int getPostionId(String key) {String cacheKey = String.format("schedule:CFPushTasks:PostionId:%s", key);int id = 0;if (redisTemplate.hasKey(cacheKey)) {id = Integer.valueOf(redisTemplate.opsForValue().get(cacheKey));}logger.debug("getPostion({}) => {}", cacheKey, id);return id;}@Scheduled(fixedRate = 1000 * 50)public void insert() {Iterable<Article> articles = null;int id = this.getPostionId("insert");if (id == 0) {articles = articleRepository.findBySiteId(this.siteId);} else {articles = articleRepository.findBySiteIdAndIdGreaterThan(this.siteId, id);}if (articles != null) {for (Article article : articles) {ResponseRestful responseRestful = new ResponseRestful(true, this.getPostionId("insert"), "INSERT", article);String jsonString;try {jsonString = mapper.writeValueAsString(responseRestful);this.send(TOPIC, jsonString);if (!this.setPostionId("insert", article.getId())) {return;}} catch (JsonProcessingException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}}@Scheduled(fixedRate = 1000 * 50)public void update() {String message = "Hello";this.send(TOPIC, message);}@Scheduled(fixedRate = 1000 * 50)public void delete() {Date date = this.getPostionDate("delete");Iterable<CmsTrash> cmsTrashs;if (date == null) {cmsTrashs = cmsTrashRepository.findBySiteIdAndTypeOrderByCtime(this.siteId, "delete");} else {cmsTrashs = cmsTrashRepository.findBySiteIdAndTypeAndCtimeGreaterThanOrderByCtime(this.siteId, "delete", date);}if (cmsTrashs != null) {for (CmsTrash cmsTrash : cmsTrashs) {ResponseRestful responseRestful = new ResponseRestful(true, this.getPostionId("delete"), "DELETE", cmsTrash);String jsonString;try {jsonString = mapper.writeValueAsString(responseRestful);this.send(TOPIC, jsonString);this.setPostionId("delete", cmsTrash.getId());if (!this.setPostionDate("delete", cmsTrash.getCtime())) {return;} else {}} catch (JsonProcessingException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}}private void send(String topic, String message) {ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onSuccess(SendResult<String, String> result) {logger.debug("sent message='{}' with offset={}", message, result.getRecordMetadata().offset());}@Overridepublic void onFailure(Throwable ex) {logger.error("unable to send message='{}'", message, ex);}});}private void post(ResponseRestful responseRestful) {RestTemplate restTemplate = new RestTemplate();String response = restTemplate.postForObject("http://localhost:8440/test/cf/post.json", responseRestful, String.class);// logger.info(article.toString());if (response != null) {logger.info(response);}} }
springboot开发。
原文出处:Netkiller 系列 手札
本文作者:陈景峯
转载请与作者联系,同时请务必标明文章原始出处和作者信息及本声明。
版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。
工作时间:8:00-18:00
客服电话
电子邮件
admin@qq.com
扫码二维码
获取最新动态