我们已经讨论了 SpringCloud 提供的基于微服务架构的许多特性。然而,我们一直在考虑基于 RESTful 的同步服务间通信。您可能还记得第一章、微服务简介中提到的其他一些流行的通信方式,如发布/订阅或异步、事件驱动的点对点消息传递。在本章中,我将介绍一种与前几章不同的微服务方法。我们将更详细地讨论如何使用 SpringCloudStream 来构建消息驱动的微服务。
本章将介绍的主题包括:
SpringCloudStream 构建在 SpringBoot 之上。它允许我们创建独立的生产级 Spring 应用程序,并使用 Spring 集成来帮助实现与消息代理的通信。使用 SpringCloudStream 创建的每个应用程序都通过输入和输出通道与其他微服务集成。这些通道通过特定于中间件的绑定器实现连接到外部消息代理。Kafka 和 Rabbit MQ 提供了两种内置绑定器实现。
Spring 集成扩展了 Spring 编程模型,支持著名的企业集成模式(EIP)。EIP 定义了许多组件,这些组件通常用于分布式系统中的编排。您可能听说过消息通道、路由器、聚合器或端点等模式。Spring 集成框架的主要目标是提供一个基于 EIP 构建 Spring 应用程序的简单模型。如果您对 EIP 的更多详细信息感兴趣,请访问网站http://www.enterpriseintegrationpatterns.com/patterns/messaging/toc.html 。
我认为,介绍 SpringCloud 流主要功能的最合适方式是通过基于微服务的系统示例。我们将稍微修改系统的体系结构,这在前面的章节中已经讨论过。让我简要回顾一下该体系结构。我们的系统负责处理订单。它由四个独立的微服务组成。order-service
微服务首先与product-service
通信以收集所选产品的详细信息,然后与customer-service
通信以检索客户及其账户的信息。现在,发送到order-service
的订单将被异步处理。仍然有一个公开的 restfulhttpapi 端点,用于客户端提交新订单,但应用程序不处理这些订单。它只保存新订单,将其发送给消息代理,然后向客户机响应订单已被批准处理。当前讨论的示例的主要目标是显示点对点通信,因此消息将仅由一个应用程序account-service
接收。下面是一个示意图,说明了示例系统体系结构:
account-service
收到新消息后,调用product-service
公开的方法,以了解其价格。它从账户中取款,然后以当前订单状态将响应发送回order-service
。该消息也通过消息代理发送。order-service
微服务接收消息并更新订单状态。如果外部客户端想要检查当前状态的订单,它可以调用端点公开带有订单详细信息的find
方法。示例应用程序的源代码可在 GitHub(上获得 https://github.com/piomin/sample-spring-cloud-messaging.git )。
在项目中包含 SpringCloudStream 的推荐方法是使用依赖关系管理系统。springcloudstream 有一个与整个 springcloud 框架相关的独立发布序列管理。但是,如果我们在dependencyManagement
部分的Edgware.RELEASE
版本中声明了spring-cloud-dependencies
,那么我们就不必在pom.xml
中声明任何其他内容。如果您希望仅使用 Spring Cloud Stream 项目,则应定义以下部分:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-dependencies</artifactId>
<version>Ditmars.SR2</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
下一步是将spring-cloud-stream
添加到项目依赖项中。我还建议您至少包括spring-cloud-sleuth
库,以提供与通过 Zuul 网关传入order-service
的源请求相同的traceId
发送消息:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-sleuth</artifactId>
</dependency>
要为应用程序启用到 MessageBroker 的连接,请使用@EnableBinding
注释主类。@EnableBinding
注释将一个或多个接口作为参数。您可以在 Spring Cloud Stream 提供的三个接口中进行选择:
Sink
:用于标记从入站通道接收消息的服务。Source
:用于向出站通道发送消息。Processor
:当您同时需要入站通道和出站通道时可以使用,因为它扩展了Source
和Sink
接口。因为order-service
既发送消息,也接收消息,所以它的主类被注释为@EnableBinding(Processor.class)
。下面是启用 Spring Cloud Stream 绑定的order-service
的主要类:
@SpringBootApplication
@EnableDiscoveryClient
@EnableBinding(Processor.class)
public class OrderApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(OrderApplication.class).web(true).run(args);
}
}
由于使用了 Spring 集成,该应用程序独立于项目中包含的 MessageBroker 实现。SpringCloudStream 自动检测并使用在类路径上找到的绑定器。这意味着我们可以选择不同类型的中间件,并使用相同的代码。所有特定于中间件的设置都可以通过 Spring Boot 支持的形式的外部配置属性来覆盖,例如应用程序参数、环境变量,或者仅通过application.yml
文件。正如我前面提到的,SpringCloudStream 为 Kafka 和 RabbitMQ 提供了绑定器实现。要包含对 Kafka 的支持,请将以下依赖项添加到项目中:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
就个人而言,我更喜欢 RabbitMQ,但在本章中,我们将为 RabbitMQ 和 Kafka 创建一个示例。由于我们已经讨论了 RabbitMQ 的特性,我将从基于 RabbitMQ 的示例开始:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
在启用 SpringCloudStream 并包括 binder 实现之后,我们可以创建发送者和监听器。让我们从负责向代理发送新订单消息的生产者开始。这是由order-service
中的OrderSender
实现的,它使用Output
bean 发送消息:
@Service
public class OrderSender {
@Autowired
private Source source;
public boolean send(Order order) {
return this.source.output().send(MessageBuilder.withPayload(order).build());
}
}
该 bean 由控制器调用,它公开了允许提交新订单的 HTTP 方法:
@RestController
public class OrderController {
private static final Logger LOGGER = LoggerFactory.getLogger(OrderController.class);
private ObjectMapper mapper = new ObjectMapper();
@Autowired
OrderRepository repository;
@Autowired
OrderSender sender;
@PostMapping
public Order process(@RequestBody Order order) throws JsonProcessingException {
Order o = repository.add(order);
LOGGER.info("Order saved: {}", mapper.writeValueAsString(order));
boolean isSent = sender.send(o);
LOGGER.info("Order sent: {}", mapper.writeValueAsString(Collections.singletonMap("isSent", isSent)));
return o;
}
}
包含订单信息的消息已发送到 message broker。现在,应该由account-service
接收。要实现这一点,我们必须声明接收方,它正在侦听传入 MessageBroker 上创建的队列的消息。为了接收带有订单数据的消息,我们只需要用@StreamListener
注释以Order
对象作为参数的方法:
@SpringBootApplication
@EnableDiscoveryClient
@EnableBinding(Processor.class)
public class AccountApplication {
@Autowired
AccountService service;
public static void main(String[] args) {
new SpringApplicationBuilder(AccountApplication.class).web(true).run(args);
}
@Bean
@StreamListener(Processor.INPUT)
public void receiveOrder(Order order) throws JsonProcessingException {
service.process(order);
}
}
现在您可以启动示例应用程序了。但是,有一个重要的细节尚未提及。这两个应用程序都试图连接本地主机上运行的 RabbitMQ,并且都将相同的交换视为输入或输出。这是一个问题,因为order-service
将消息发送到输出交换,而account-service
侦听传入其输入交换的消息。这是不同的交流,但首先要做的是。让我们从运行消息代理开始。
在前面的章节中,我们已经使用其 Docker 映像启动了 RabbitMQ 代理,因此值得提醒我们自己该命令。它使用 RabbitMQ 启动一个独立的 Docker 容器,该容器在端口5672
下可用,其 UI web 控制台在端口15672
下可用:
docker run -d --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management
默认 RabbitMQ 地址应使用application.yml
文件中的spring.rabbit.*
属性覆盖:
spring:
rabbitmq:
host: 192.168.99.100
port: 5672
默认情况下,springcloudstream 创建一个主题交换来进行通信。这种类型的交换更适合发布/订阅交互模型。我们可以用exchangeType
属性覆盖它,如application.yml
的片段所示,如下所示:
spring:
cloud:
stream:
rabbit:
bindings:
output:
producer:
exchangeType: direct
input:
consumer:
exchangeType: direct
order-service
和account-service
应提供相同的配置设置。您不必手动创建任何 exchange。如果它不存在,则在启动期间由应用程序自动创建。否则,应用程序只绑定到该交换。默认情况下,它创建交换,为@Input
通道输入名称,为@Output
通道输出名称。这些名称可以被spring.cloud.stream.bindings.output.destination
和spring.cloud.stream.bindings.input.destination
属性覆盖,其中输入和输出是通道的名称。这个配置选项不仅是对 SpringCloudStream 特性的一个很好的补充,而且是用于在服务间通信中关联输入和输出目的地的关键设置。为什么会发生这种情况的解释很简单。在我们的示例中,order-service
是消息源应用程序,因此它将消息发送到输出通道。然后,另一方面,account-service
监听输入通道上的传入消息。如果order-service
输出通道和account-service
输入通道在代理上引用的目标不相同,则它们之间的通信将失败。最后,我决定使用名为orders-out
和orders-in
的目的地,我为order-service
提供了以下配置:
spring:
cloud:
stream:
bindings:
output:
destination: orders-out
input:
destination: orders-in
account-service
的类似配置设置相反:
spring:
cloud:
stream:
bindings:
output:
destination: orders-in
input:
destination: orders-out
两个应用程序启动后,您可以使用其 web 管理控制台(位于http://192.168.99.100:15672
(quest
/guest
)轻松查看 RabbitMQ 代理上声明的交换列表。下面是隐式创建的交换,您可以看到我们为测试目的创建的两个目的地:
默认情况下,Spring Cloud Stream 提供一个输入和一个输出消息通道。我们可以想象这样一种情况,即我们的系统需要为每种类型的消息通道提供多个目的地。让我们回到一个示例系统架构中,并考虑每一个订单都由两个其他微服务异步处理的情况。到目前为止,只有account-service
一直在监听order-service
传入的事件。在当前示例中,product-service
将是传入订单的接收者。其在该场景中的主要目标是管理可用产品的数量,并根据订单详细信息减少这些产品。它要求我们在order-service
内部定义两个输入和输出消息通道,因为我们仍然有基于直接 RabbitMQ 交换的点对点通信,其中每个消息可能只由一个使用者处理。
在这种情况下,我们应该使用@Input
和@Output
方法声明两个接口。每个方法都必须返回一个channel
对象。Spring Cloud Stream 为出站通信提供了两个可绑定的消息组件——MessageChannel
,为入站通信提供了其扩展SubscribableChannel
。以下是与product-service
交互的接口定义。已为与account-service
的消息传递创建了类似接口:
public interface ProductOrder {
@Input
SubscribableChannel productOrdersIn();
@Output
MessageChannel productOrdersOut();
}
下一步是通过用@EnableBinding(value={AccountOrder.class, ProductOrder.class}
注释应用程序的主类来激活应用程序的声明组件。现在,您可以在配置属性中使用这些通道的名称来引用它们,例如,spring.cloud.stream.bindings.productOrdersOut.destination=product-orders-in
。在使用@Input
和@Output
注释时,可以通过指定频道名称来定制每个频道名称,如下例所示:
public interface ProductOrder {
@Input("productOrdersIn")
SubscribableChannel ordersIn();
@Output("productOrdersOut")
MessageChannel ordersOut();
}
基于自定义接口声明,SpringCloudStream 将生成一个实现该接口的 bean。但是,仍然必须在负责发送消息的 bean 中访问它。与前面的示例相比,直接注入绑定通道会更舒适。下面是当前产品订单发送者的 bean 实现。bean 还有一个类似的实现,它向account-service
发送消息:
@Service
public class ProductOrderSender {
@Autowired
private MessageChannel output;
@Autowired
public SendingBean(@Qualifier("productOrdersOut") MessageChannel output) {
this.output = output;
}
public boolean send(Order order) {
return this.output.send(MessageBuilder.withPayload(order).build());
}
}
还应为目标服务提供每个消息通道自定义接口。侦听器应绑定到消息代理上的正确消息通道和目标:
@StreamListener(ProductOrder.INPUT)
public void receiveOrder(Order order) throws JsonProcessingException {
service.process(order);
}
您可能已经注意到,示例系统混合了不同风格的服务间通信。有些微服务使用典型的 RESTfulHTTPAPI,有些微服务使用 MessageBroker。在一个应用程序中混合不同的通信方式也没有异议。例如,您可以使用 Spring Cloud Stream 将spring-cloud-starter-feign
包含到项目中,并使用@EnableFeignClients
注释启用它。在我们的示例系统中,这两种不同的通信方式结合了account-service
,通过消息代理与order-service
集成,通过 REST API 与product-service
集成。以下是在account-service
模块内的外国客户机的product-service
实现:
@FeignClient(name = "product-service")
public interface ProductClient {
@PostMapping("/ids")
List<Product> findByIds(@RequestBody List<Long> ids);
}
还有其他好消息。感谢 Spring Cloud Sleuth,在通过网关传入系统的单个请求期间交换的所有消息都具有相同的traceId
。无论是同步 REST 通信还是异步消息传递,您都可以使用标准日志文件或日志聚合工具(如 Elastic Stack)轻松跟踪和关联微服务之间的日志。
我认为现在是运行和测试我们的示例系统的好时机。首先,我们必须使用mvn clean install
命令来构建整个项目。要使用两个微服务在两个不同的交换机上监听消息来访问代码示例,您应该切换到advanced
分支(https://github.com/piomin/sample-spring-cloud-messaging/tree/advanced )。您应该启动所有可用的应用程序,包括网关、发现和三个微服务(account-service
、order-service
、product-service
)。当前讨论的案例假设我们还使用其 Docker 容器启动了 RabbitMQ、Logstash、Elasticsearch 和 Kibana。有关如何使用 Docker 镜像在本地运行弹性堆栈的详细说明,请参阅第 9 章、分布式日志记录和跟踪。下图详细显示了系统的体系结构:
在运行所有必需的应用程序和工具之后,我们可以继续进行测试。以下是示例请求,可以通过 API 网关发送到order-service
:
curl -H "Content-Type: application/json" -X POST -d '{"customerId":1,"productIds":[1,3,4],"status":"NEW"}' http://localhost:8080/api/order
当我第一次使用按照前面章节中的描述配置的应用程序运行测试时,它不起作用。我可以理解,你们中的一些人可能有点困惑,因为通常它是在默认设置下测试的。为了使其正常运行,我还必须在application.yml
中添加以下属性:spring.cloud.stream.rabbit.bindings.output.producer.routingKeyExpression: '"#"'
。它将默认生产者的路由密钥设置为与应用程序启动期间自动创建的 exchange 路由密钥一致。在以下屏幕截图中,您可能会看到其中一个输出交换定义:
在前面描述的修改之后,测试应成功结束。微服务打印的日志通过traceId
相互关联。我在logback-spring.xml
中稍微修改了默认的 Sleuth 日志记录格式,现在就是这样配置的-%d{HH:mm:ss.SSS} %-5level [%X{X-B3-TraceId:-},%X{X-B3-SpanId:-}] %msg%n
。发送测试请求order-service
测试请求后,记录以下信息:
12:34:48.696 INFO [68038cdd653f7b0b,68038cdd653f7b0b] Order saved: {"id":1,"status":"NEW","price":0,"customerId":1,"accountId":null,"productIds":[1,3,4]}
12:34:49.821 INFO [68038cdd653f7b0b,68038cdd653f7b0b] Order sent: {"isSent":true}
如您所见,account-service
也使用相同的日志格式,并打印与order-service
相同的traceId
:
12:34:50.079 INFO [68038cdd653f7b0b,23432d962ec92f7a] Order processed: {"id":1,"status":"NEW","price":0,"customerId":1,"accountId":null,"productIds":[1,3,4]}
12:34:50.332 INFO [68038cdd653f7b0b,23432d962ec92f7a] Account found: {"id":1,"number":"1234567890","balance":50000,"customerId":1}
12:34:52.344 INFO [68038cdd653f7b0b,23432d962ec92f7a] Products found: [{"id":1,"name":"Test1","price":1000},{"id":3,"name":"Test3","price":2000},{"id":4,"name":"Test4","price":3000}]
可以使用弹性堆栈聚合单个事务期间生成的所有日志。您可以通过X-B3-TraceId
字段过滤条目,例如9da1e5c83094390d
:
创建 SpringCloudStream 项目的主要动机实际上是支持持久发布/订阅模型。在前面的部分中,我们讨论了微服务之间的点对点通信,这只是一个附加功能。然而,无论我们决定使用点对点还是发布/订阅模型,编程模型仍然是相同的。
在发布/订阅通信中,数据通过共享主题进行广播。它降低了生产者和消费者的复杂性,并允许在不改变流的情况下轻松地将新应用程序添加到现有拓扑中。在最后一个系统示例中可以清楚地看到这一点,我们决定添加第二个应用程序,该应用程序使用源微服务生成的事件。与最初的体系结构相比,我们必须为每个目标应用程序定义专用的定制消息通道。通过队列进行直接通信,消息只能由一个应用程序实例使用,因此,解决方案是必要的。发布/订阅模型的使用简化了该体系结构。
发布/订阅模型的示例应用程序的开发比点对点通信更简单。我们不必覆盖任何默认消息通道来启用与多个接收器的交互。与说明了向单个目标应用程序(account-service
发送消息的初始示例相比,我们只需要稍微修改配置设置。因为 Spring Cloud Stream 默认绑定到主题,所以我们不必为输入消息通道重写exchangeType
。正如您在下面的配置片段中所看到的,我们在向order-service
发送响应时仍然使用点对点通信。如果我们真的想一想,这是有道理的。order-service
微服务发送account-service
和product-service
都必须接收的消息,而它们的响应只发送给order-service
:
spring:
application:
name: product-service
rabbitmq:
host: 192.168.99.100
port: 5672
cloud:
stream:
bindings:
output:
destination: orders-in
input:
destination: orders-out
rabbit:
bindings:
output:
producer:
exchangeType: direct
routingKeyExpression: '"#"'
product-service
的主要处理方法的逻辑非常简单。只需从收到的订单中找到所有的productIds
,更改每个订单的存储产品数量,然后将响应发送到order-service
:
@Autowired
ProductRepository productRepository;
@Autowired
OrderSender orderSender;
public void process(final Order order) throws JsonProcessingException {
LOGGER.info("Order processed: {}", mapper.writeValueAsString(order));
for (Long productId : order.getProductIds()) {
Product product = productRepository.findById(productId);
if (product.getCount() == 0) {
order.setStatus(OrderStatus.REJECTED);
break;
}
product.setCount(product.getCount() - 1);
productRepository.update(product);
LOGGER.info("Product updated: {}", mapper.writeValueAsString(product));
}
if (order.getStatus() != OrderStatus.REJECTED) {
order.setStatus(OrderStatus.ACCEPTED);
}
LOGGER.info("Order response sent: {}", mapper.writeValueAsString(Collections.singletonMap("status", order.getStatus())));
orderSender.send(order);
}
要访问当前样本,您只需切换到publish_subscribe
分支,可在获得 https://github.com/piomin/sample-spring-cloud-messaging/tree/publish_subscribe 。然后,您应该构建父项目并像前面的示例一样运行所有服务。如果您想测试它,那么在您只有一个account-service
和product-service
的运行实例之前,所有这些都可以正常工作。让我们讨论一下那个问题
当谈到基于微服务的体系结构时,可伸缩性总是作为其主要优势之一。通过创建给定应用程序的多个实例来扩展系统的能力非常重要。执行此操作时,应用程序的不同实例被放置在竞争消费者关系中,其中只有一个实例需要处理给定的消息。对于点对点通信来说,这不是问题,但在发布-订阅模型中,消息被所有接收者使用,这可能是一个挑战。
提高 microservice 实例数量的可用性是 SpringCloudStream 的主要概念之一。然而,这个想法背后并没有魔力。使用 SpringCloudStream,运行一个应用程序的多个实例非常容易。其中一个原因是来自消息代理的本地支持,消息代理旨在处理许多消费者和大量流量。
在我们的例子中,所有消息传递微服务也公开 RESTfulHTTPAPI,因此首先,我们必须自定义每个实例的服务器端口。我们以前做过这样的手术。我们也可以考虑设置两个 SpringCloud 流属性,AutoT0 和 T1。多亏了它们,microservice 的每个实例都能够接收到关于同一应用程序的其他实例的启动数量以及它自己的实例索引的信息。仅当您希望启用分区功能时,才需要正确配置这些属性。稍后我将进一步讨论这个机制。现在,让我们看一下放大后的应用程序的配置设置。account-service
和product-service
都定义了两个配置文件,用于运行应用程序的多个实例。我们在那里定制了服务器的 HTTP 端口、编号和实例索引:
---
spring:
profiles: instance1
cloud:
stream:
instanceCount: 2
instanceIndex: 0
server:
port: ${PORT:8091}
---
spring:
profiles: instance2
cloud:
stream:
instanceCount: 2
instanceIndex: 1
server:
port: ${PORT:9091}
构建父项目后,可以运行应用程序的两个实例。它们中的每一个都使用分配给启动期间传递的正确配置文件的属性进行初始化,例如,java -jar --spring.profiles.active=instance1 target/account-service-1.0-SNAPSHOT.jar
。如果您向order-service
端点POST /
发送测试请求,新订单将转发到 RabbitMQ 主题交换,以便连接到该交换的account-service
和product-service
都能接收到。问题在于,每个服务的所有实例都会接收到消息,这并不是我们想要实现的。在这里,分组机制提供了帮助。
我们的目的是明确的。我们有许多微服务使用来自同一主题的消息。应用程序的不同实例位于相互竞争的消费者关系中,但其中只有一个实例应处理给定的消息。SpringCloudStream 引入了消费者群体的概念,对这种行为进行建模。要激活这样的行为,我们应该设置一个名为spring.cloud.stream.bindings.<channelName>.group
的属性,并使用组名。设置后,订阅给定目的地的所有组都将接收已发布数据的副本,但每个组中只有一个成员接收并处理来自该目的地的消息。在我们的例子中,有两组。首先,对于所有具有名称帐户的account-service
实例,第二,对于具有名称产品的product-service
实例
以下是account-service
当前的绑定配置。orders-in
目的地是为与order-service
直接通信而创建的队列,因此只有orders-out
按服务名称分组。已为product-service
准备了类似配置:
spring:
cloud:
stream:
bindings:
output:
destination: orders-in
input:
destination: orders-out
group: account
第一个不同之处在于为 RabbitMQ 交换自动创建的队列的名称。现在,它不是随机生成的名称,例如orders-in.anonymous.qNxjzDq5Qra-yqHLUv50PQ
,而是由目的地和组名组成的确定字符串。以下屏幕截图显示 RabbitMQ 上当前存在的所有队列:
您可以自行执行重新测试,以验证消息是否仅由同一组中的一个应用程序接收。但是,您不确定哪个实例将处理传入消息。为了确定这一点,可以使用分区机制。
SpringCloudStream 支持在应用程序的多个实例之间对数据进行分区。在典型用例中,目的地被视为被划分为不同的分区。每个生产者在发送由多个使用者实例接收的消息时,确保数据由配置的字段标识,以强制同一使用者实例进行处理。
要为应用程序启用分区功能,必须在生产者配置设置中定义partitionKeyExpression
或partitionKeyExtractorClass
属性和partitionCount
。以下是可能为您的应用程序提供的示例配置:
spring.cloud.stream.bindings.output.producer.partitionKeyExpression=payload.customerId
spring.cloud.stream.bindings.output.producer.partitionCount=2
分区机制还需要在用户端设置spring.cloud.stream.instanceCount
和spring.cloud.stream.instanceIndex
属性。它还必须显式启用,spring.cloud.stream.bindings.input.consumer.partitioned
属性设置为true
。实例索引负责标识特定实例从中接收数据的唯一分区。一般来说,生产者方的partitionCount
和消费者方的instanceCount
应该相等。
让我让您熟悉 SpringCloudStream 提供的分区机制。首先,它基于partitionKeyExpression
计算分区密钥,该分区密钥根据出站消息或PartitionKeyExtractorStrategy
接口的实现进行评估,该接口定义了提取消息密钥的算法。一旦计算出消息密钥,目标分区将被确定为介于零和partitionCount - 1
之间的值。默认计算公式为key.hashCode() % partitionCount
。它可以通过partitionSelectorExpression
属性进行定制,也可以通过创建org.springframework.cloud.stream.binder.PartitionSelectorStrategy
接口的实现进行定制。计算出的密钥与用户端的instanceIndex
匹配。
我认为关于分区的主要概念已经解释过了。让我们继续讨论样本。以下是product-service
输入通道的当前配置(与account-service
设置的账户组名称相同):
spring:
cloud:
stream:
bindings:
input:
consumer:
partitioned: true
destination: orders-out
group: product
每个微服务都有两个运行实例,它们使用来自主题交换的数据。order-service
中还为生产者设置了两个分区。根据Order
对象的customerId
字段计算消息键。索引为0
的分区专用于customerId
字段中的偶数订单,索引为1
的分区专用于customerId
字段中的奇数订单
事实上,RabbitMQ 不支持分区。SpringCloudStream 是如何使用 RabbitMQ 实现分区过程的,这很有趣。下面的屏幕截图演示了在 RabbitMQ 中创建的交换绑定列表。如您所见,有两个已为 exchange 定义的路由密钥-orders-out-0
和orders-out-1
:
如果您在 JSON 消息中发送一个customerId
等于 1 的订单,例如{"customerId": 1,"productIds": [4],"status": "NEW"}
,它将始终由一个instanceIndex=1
实例处理。它可以在应用程序日志中签出,也可以使用 RabbitMQ web 控制台签出。下面是每个队列的消息速率图,其中带有customerId=1
的消息已发送多次:
SpringCloudStream 配置设置可以使用 SpringBoot 支持的任何机制来覆盖,例如应用程序参数、环境变量以及 YAML 或属性文件。它定义了许多可应用于所有绑定器的通用配置选项。但是,对于应用程序使用的特定 MessageBroker,还有一些特定的附加属性。
当前属性组应用于整个 Spring Cloud Stream 应用程序。以下所有属性的前缀均为spring.cloud.stream
:
名称 | 默认值 | 描述 |
---|---|---|
instanceCount |
1 |
应用程序正在运行的实例数。更多详情请参见缩放和分组部分。 |
instanceIndex |
0 |
应用程序实例的索引。有关更多详细信息,请参阅缩放和分组部分。 |
dynamicDestinations |
- | 可以动态绑定的目的地列表。 |
defaultBinder |
- | 如果定义了多个活页夹,则默认活页夹。有关详细信息,请参阅多活页夹部分。 |
overrideCloudConnectors |
false |
仅当云处于活动状态且在类路径上找到 SpringCloud 连接器时,才使用此选项。当设置为true 时,绑定器完全忽略绑定服务,依赖spring.rabbitmq.* 或spring.kafka.* Spring Boot 属性。 |
下一组属性与消息通道相关。在 SpringCloud 命名法中,这些是绑定属性。它们只能分配给消费者、生产者或同时分配给两者。以下是属性列表及其默认值和说明:
| 名称 | 默认值 | 描述 |
| destination
| - | 代理上为消息通道配置的目标目标名称。如果通道仅由一个使用者使用,则可以将其指定为以逗号分隔的目的地列表。 |
| group
| null
| 渠道的消费者群体。详见缩放分组部分。 |
| contentType
| null
| 通过给定通道交换的消息的内容类型。例如,我们可以将其设置为application/json
。然后,从该应用程序发送的所有对象将自动转换为 JSON 字符串。 |
| binder
| null
| 频道使用的默认活页夹。有关详细信息,请参阅多活页夹部分。 |
以下属性列表仅适用于输入绑定,并且必须以spring.cloud.stream.bindings.<channelName>.consumer
作为前缀。我将指出其中最重要的一点:
名称 | 默认值 | 说明 |
---|---|---|
concurrency |
1 |
每个输入通道的用户数 |
partitioned |
false |
它允许从分区生产者接收数据 |
headerMode |
embeddedHeaders |
如果设置为raw ,则禁用对输入的头解析 |
maxAttempts |
3 |
消息处理失败时的重试次数。将此选项设置为1 将禁用重试机制 |
以下绑定属性仅适用于输出绑定,并且必须以spring.cloud.stream.bindings.<channelName>.producer
作为前缀。我还将仅指出其中最重要的部分:
| 名称 | 默认值 | 说明 |
| requiredGroups
| - | 必须在 MessageBroker 上创建的以逗号分隔的组列表 |
| headerMode
| embeddedHeaders
| 如果设置为raw
,则禁用对输入的头解析 |
| useNativeEncoding
| false
| 如果设置为true
,则出站消息由客户端库直接序列化 |
| errorChannelEnabled
| false
| 如果设置为true
,则向目的地的错误通道发送失败消息 |
介绍了 SpringCloud 流编程模型的基础知识,以及点对点和发布/订阅通信示例。让我们讨论一些更高级的示例功能。
在本章介绍的所有示例中,我们已经通过 RESTfulAPI 发送了用于测试目的的订单。但是,通过在应用程序中定义消息源,我们可以轻松地创建一些测试数据。下面是一个 bean,它使用@Poller
每秒生成一条消息并将其发送到输出通道:
@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
public MessageSource<Order> ordersSource() {
Random r = new Random();
return () -> new GenericMessage<>(new Order(OrderStatus.NEW, (long) r.nextInt(5), Collections.singletonList((long) r.nextInt(10))));
}
您可能还记得,account-service
和product-service
一直在从order-service
接收事件,然后发送回响应消息。我们已经创建了OrderSender
bean,它负责准备响应负载并将其发送到输出通道。事实证明,如果我们在方法中返回响应对象并用@SentTo
注释它,实现可能会更简单:
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Order receiveAndSendOrder(Order order) throws JsonProcessingException {
LOGGER.info("Order received: {}", mapper.writeValueAsString(order));
return service.process(order);
}
我们甚至可以想象这样一个实现,比如下面,而不使用@StreamListener
。transformer 模式负责更改对象的形式。在这种情况下,它修改了两个order
字段—status
和price
:
@EnableBinding(Processor.class)
public class OrderProcessor {
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public Order process(final Order order) throws JsonProcessingException {
LOGGER.info("Order processed: {}", mapper.writeValueAsString(order));
// ...
products.forEach(p -> order.setPrice(order.getPrice() + p.getPrice()));
if (order.getPrice() <= account.getBalance()) {
order.setStatus(OrderStatus.ACCEPTED);
account.setBalance(account.getBalance() - order.getPrice());
} else {
order.setStatus(OrderStatus.REJECTED);
}
return order;
}
}
假设我们希望以不同的方式处理传入同一消息通道的消息,我们可以使用条件分派。Spring Cloud Stream 支持根据条件向在输入通道上注册的多个@StreamListener
方法发送消息。该条件是在@StreamListener
注释的condition
属性中定义的Spring 表达式语言(SpEL表达式):
public boolean send(Order order) {
Message<Order> orderMessage = MessageBuilder.withPayload(order).build();
orderMessage.getHeaders().put("processor", "account");
return this.source.output().send(orderMessage);
}
下面是一个示例实现,它定义了两个用@StreamListener
注释的方法,它们侦听同一主题。其中一个专用于从account-service
传入的消息,而第二个专用于product-service
。发送传入消息时,将根据其具有processor
名称的标头:
@SpringBootApplication
@EnableDiscoveryClient
@EnableBinding(Processor.class)
public class OrderApplication {
@StreamListener(target = Processor.INPUT, condition = "headers['processor']=='account'")
public void receiveOrder(Order order) throws JsonProcessingException {
LOGGER.info("Order received from account: {}", mapper.writeValueAsString(order));
// ...
}
@StreamListener(target = Processor.INPUT, condition = "headers['processor']=='product'")
public void receiveOrder(Order order) throws JsonProcessingException {
LOGGER.info("Order received from product: {}", mapper.writeValueAsString(order));
// ...
}
}
在讨论 SpringCloud 与消息代理的集成时,我已经多次提到 ApacheKafka。然而,到目前为止,我们还没有运行任何基于该平台的示例。事实上,RabbitMQ 往往是使用 SpringCloud 项目时的首选,但 Kafka 也值得我们注意。与 RabbitMQ 相比,它的优势之一是对分区的本地支持,这是 SpringCloudStream 最重要的特性之一。
卡夫卡不是一个典型的消息代理。它是一个分布式流媒体平台。它的主要功能是允许您发布和订阅记录流。它对于转换或响应数据流的实时流应用程序特别有用。它通常作为由一个或多个服务器组成的集群运行,并在主题中存储记录流。
不幸的是,ApacheKafka 没有正式的 Docker 图像。但是,我们可以使用非官方的,例如 Spotify 共享的。与其他可用的 Kafka docker 图像相比,此图像在同一容器中同时运行 Zookeeper 和 Kafka。下面是 Docker 命令,它启动 Kafka 并在端口9092
上公开它。Zookeeper 也可在外部端口2181
上使用:
docker run -d --name kafka -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=192.168.99.100 --env ADVERTISED_PORT=9092 spotify/kafka
要为应用程序启用 ApacheKafka,请将spring-cloud-starter-stream-kafka
启动器包含到依赖项中。我们当前的示例与发布/订阅模型第节中介绍的发布/订阅示例非常相似,该示例使用 RabbitMQ 发布/订阅进行分组和分区。唯一的区别在于依赖项和配置设置。
SpringCloudStream 自动检测并使用在类路径上找到的绑定器。连接设置可能会被spring.kafka.*
属性覆盖。在本例中,我们只需要将自动配置的 Kafka 客户端地址更改为 Docker 机器地址192.168.99.100
。对于 Kafka 客户端使用的 Zookeeper,应执行相同的修改:
spring:
application:
name: order-service
kafka:
bootstrap-servers: 192.168.99.100:9092
cloud:
stream:
bindings:
output:
destination: orders-out
producer:
partitionKeyExpression: payload.customerId
partitionCount: 2
input:
destination: orders-in
kafka:
binder:
zkNodes: 192.168.99.100
启动 discovery、gateway 和所有所需的 MicroService 实例后,您可以执行与前面示例相同的测试。如果所有配置都正确,那么在应用程序启动期间,您应该在日志中看到以下片段。测试结果与基于 RabbitMQ 的样本完全相同:
16:58:30.008 INFO [,] Discovered coordinator 192.168.99.100:9092 (id: 2147483647 rack: null) for group account.
16:58:30.038 INFO [,] Successfully joined group account with generation 1
16:58:30.039 INFO [,] Setting newly assigned partitions [orders-out-0, orders-out-1] for group account
16:58:30.081 INFO [,] partitions assigned:[orders-out-0, orders-out-1]
Spring Cloud Stream Kafka 提供了专门为 Kafka Streams 绑定设计的活页夹。使用此绑定器,应用程序可以利用 Kafka Streams API。要为您的应用程序启用此功能,请包含对项目的以下依赖项:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kstream</artifactId>
</dependency>
Kafka Streams API 提供高级流 DSL。可以通过声明以KStream
接口为参数的@StreamListener
方法进行访问。KStream 为流操作提供了一些有用的方法,这些方法在其他流 API(如map
、flatMap
、join
或filter
)中很有名。还有一些卡夫卡流特有的其他方法,如to(...)
(用于向主题发送流)或through(...)
(与to
相同,但也会从主题创建KStream
的新实例):
@SpringBootApplication
@EnableBinding(KStreamProcessor.class)
public class AccountApplication {
@StreamListener("input")
@SendTo("output")
public KStream<?, Order> process(KStream<?, Order> input) {
// ..
}
public static void main(String[] args) {
SpringApplication.run(AccountApplication.class, args);
}
}
在讨论示例应用程序的实现时,之前已经介绍了 Kafka 的一些 SpringCloud 配置设置。下面是一个包含最重要属性的表,可以为定制 ApacheKafka 活页夹设置这些属性。所有这些属性的前缀为spring.cloud.stream.kafka.binder
:
名称 | 默认值 | 描述 |
---|---|---|
brokers |
localhost |
包含或不包含端口信息的代理的逗号分隔列表。 |
defaultBrokerPort |
9092 |
如果没有使用brokers 属性定义端口,则设置默认端口。 |
zkNodes |
localhost |
以逗号分隔的 ZooKeeper 节点列表,包含或不包含端口信息。 |
defaultZkPort |
2181 |
如果没有使用zkNodes 属性定义端口,则设置默认 ZooKeeper 端口。 |
configuration |
- | Kafka 客户端属性的键/值映射。它适用于活页夹创建的所有客户端。 |
headers |
- | 将由活页夹转发的自定义标题列表。 |
autoCreateTopics |
true |
如果设置为true ,活页夹将自动创建新主题。 |
autoAddPartitions |
false |
如果设置为true ,绑定器将自动创建新分区。 |
在 SpringCloud 流命名法中,可以实现的接口被称为绑定器,用于在外部中间件处提供与物理目的地的连接。目前,有两种可用的内置绑定器实现 Kafka 和 RabbitMQ。如果您想提供一个定制的活页夹库,那么将输入和输出连接到外部中间件的策略的关键接口是Binder
,它有两种方法—bindConsumer
和bindProducer
。有关更多详细信息,您可以参考 Spring Cloud Stream 规范。
对我们来说,重要的是能够在一个应用程序中使用多个绑定。您甚至可以混合使用不同的实现,例如,RabbitMQ 和 Kafka。SpringCloudStream 依赖于 SpringBoot 在绑定过程中的自动配置。类路径上可用的实现将自动使用。如果要同时使用两种默认绑定,请将以下依赖项包含到项目中:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
如果在类路径中找到了多个绑定器,则应用程序必须检测哪些绑定器应用于特定的通道绑定。我们可以使用spring.cloud.stream.defaultBinder
属性全局配置默认绑定器,也可以使用spring.cloud.stream.bindings.<channelName>.binder
属性为每个通道单独配置默认绑定器。现在,我们回到我们的示例,在那里配置多个绑定器。我们为account-service
和order-service
之间的直接通信定义 RabbitMQ,为order-service
和其他微服务之间的发布/订阅模型定义 Kafka。
这是与在publish_subscribe
分支(中为account-service
提供的配置相同的配置 https://github.com/piomin/sample-spring-cloud-messaging/tree/publish_subscribe ),但基于两种不同的粘合剂:
spring:
cloud:
stream:
bindings:
output:
destination: orders-in
binder: rabbit1
input:
consumer:
partitioned: true
destination: orders-out
binder: kafka1
group: account
rabbit:
bindings:
output:
producer:
exchangeType: direct
routingKeyExpression: '"#"'
binders:
rabbit1:
type: rabbit
environment:
spring:
rabbitmq:
host: 192.168.99.100
kafka1:
type: kafka
environment:
spring:
kafka:
bootstrap-servers: 192.168.99.100:9092
与所有其他 SpringCloud 项目相比,SpringCloudStream 可以被视为一个单独的类别。它通常与其他项目相关联,目前由关键的 SpringCloud 数据流大力推动。这是一个用于构建数据集成和实时数据处理管道的工具包。然而,这是一个巨大的主题,而不是一本单独的书的讨论主题
更重要的是,SpringCloudStream 提供了对异步消息传递的支持,可以使用 Spring 注释样式轻松实现异步消息传递。我认为,对于你们中的一些人来说,这种服务间通信方式并不像 RESTfulAPI 模型那样明显。因此,我重点向您展示了使用 SpringCloudStream 进行点对点和发布/订阅通信的示例。我还描述了这两种消息传递方式之间的差异
发布/订阅模型并不是什么新鲜事,但多亏了 SpringCloudStream,它可以很容易地包含到基于微服务的系统中。本章还介绍了一些关键概念,如消费群体或分区。阅读后,您应该能够基于消息传递模型实现微服务,并将其与其他 SpringCloud 库集成,以便提供日志记录、跟踪,或者只是将其作为现有基于 REST 的微服务系统的一部分进行部署。
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。