皇家塔楼防御战斗
35.56M · 2026-02-13
响应式流标准(Reactive Streams)包含四个核心接口:
Publisher:数据发布者Subscriber:数据订阅者Subscription:订阅关系Processor:既是 Publisher 又是 SubscriberReactor 中常用类型:
Mono<T>:0 或 1 个元素Flux<T>:0 到 N 个元素基本示例:
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
import java.util.Optional;
public class ReactorBasics {
// Mono:0 或 1 个元素
public static Mono<String> monoExample() {
return Mono.just("Hello")
.map(String::toUpperCase)
.doOnNext(s -> System.out.println("Mono: " + s))
.doOnSuccess(s -> System.out.println("Mono 完成"));
}
// Flux:0~N 个元素
public static Flux<Integer> fluxExample() {
return Flux.range(1, 5)
.map(i -> i * 2)
.doOnNext(i -> System.out.println("Flux: " + i))
.doOnComplete(() -> System.out.println("Flux 完成"));
}
// 从 Optional 创建 Mono
public static Mono<String> fromOptional(Optional<String> opt) {
return Mono.justOrEmpty(opt);
}
// 背压示例
public static void backpressureExample() {
Flux.range(1, 100)
.log()
.limitRate(10) // 每次最多请求 10 个
.subscribe(
i -> System.out.println("onNext: " + i),
err -> err.printStackTrace(),
() -> System.out.println("完成")
);
}
// 错误处理:onErrorResume / onErrorReturn
public static Mono<String> errorHandlingExample() {
return Mono.<String>error(new RuntimeException("模拟错误"))
.onErrorResume(e -> {
System.out.println("发生错误: " + e.getMessage());
return Mono.just("降级值");
})
.onErrorReturn("默认值");
}
}
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.List;
public class ReactorOperators {
// map:同步转换
public static Flux<String> mapExample() {
return Flux.just("a", "b", "c")
.map(String::toUpperCase);
}
// flatMap:异步转换
public static Flux<String> flatMapExample() {
return Flux.just(1, 2, 3)
.flatMap(i ->
Mono.just("item-" + i)
.delayElement(Duration.ofMillis(100))
);
}
// zip:组合多个 Publisher
public static Mono<String> zipExample() {
Mono<String> first = Mono.just("Hello");
Mono<String> second = Mono.just("Reactor");
return Mono.zip(first, second)
.map(tuple -> tuple.getT1() + " " + tuple.getT2());
}
// then:只关心前面完成信号,不关心值
public static Mono<Void> thenExample() {
return Mono.just("step1")
.doOnNext(System.out::println)
.then(Mono.just("step2").doOnNext(System.out::println))
.then();
}
// merge / concat:合并流
public static Flux<Integer> mergeExample() {
Flux<Integer> even = Flux.just(2, 4, 6);
Flux<Integer> odd = Flux.just(1, 3, 5);
return Flux.merge(even, odd);
}
public static Flux<Integer> concatExample() {
Flux<Integer> first = Flux.just(1, 2, 3);
Flux<Integer> second = Flux.just(4, 5, 6);
return Flux.concat(first, second);
}
// buffer:缓冲
public static Flux<List<Integer>> bufferExample() {
return Flux.range(1, 10).buffer(3); // [1,2,3], [4,5,6], [7,8,9], [10]
}
}
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class ReactorSchedulers {
// subscribeOn:影响数据源执行线程
public static Flux<Integer> subscribeOnBoundedElastic() {
return Flux.range(1, 5)
.subscribeOn(Schedulers.boundedElastic())
.map(i -> {
System.out.println("subscribeOn 线程: " + Thread.currentThread().getName());
return i;
});
}
// publishOn:影响后续操作符执行线程
public static Flux<Integer> publishOnParallel() {
return Flux.range(1, 5)
.publishOn(Schedulers.parallel())
.map(i -> {
System.out.println("publishOn 线程: " + Thread.currentThread().getName());
return i;
});
}
// 组合使用
public static Flux<Integer> mixedSchedulers() {
return Flux.range(1, 5)
.subscribeOn(Schedulers.boundedElastic())
.map(i -> i * 2)
.publishOn(Schedulers.parallel())
.map(i -> i + 1);
}
}
(以 Maven 为例)
<!-- 引入 WebFlux 与 R2DBC -->
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
</dependencies>
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class WebFluxApplication {
public static void main(String[] args) {
SpringApplication.run(WebFluxApplication.class, args);
}
}
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.server.ResponseStatusException;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
import javax.validation.Valid;
import java.time.Duration;
@RestController
@RequestMapping("/api/reactive")
public class ReactiveUserController {
private final ReactiveUserService userService;
public ReactiveUserController(ReactiveUserService userService) {
this.userService = userService;
}
@GetMapping("/users/{id}")
public Mono<UserDTO> getById(@PathVariable Long id) {
return userService.findById(id)
.switchIfEmpty(Mono.error(
new ResponseStatusException(HttpStatus.NOT_FOUND, "用户不存在")
));
}
@GetMapping("/users")
public Flux<UserDTO> list(
@RequestParam(defaultValue = "0") int page,
@RequestParam(defaultValue = "10") int size) {
return userService.findAll(page, size);
}
@PostMapping("/users")
public Mono<UserDTO> create(@RequestBody @Valid UserCreateRequest request) {
return userService.create(request);
}
// SSE:流式推送用户数据
@GetMapping(value = "/users/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<UserDTO> stream() {
return userService.findAllStream()
.delayElements(Duration.ofSeconds(1)); // 模拟每秒推送一条
}
}
import org.springframework.data.domain.PageRequest;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
@Service
public class ReactiveUserService {
private final ReactiveUserRepository repository;
public ReactiveUserService(ReactiveUserRepository repository) {
this.repository = repository;
}
public Mono<UserDTO> findById(Long id) {
return repository.findById(id).map(this::toDTO);
}
public Flux<UserDTO> findAll(int page, int size) {
return repository
.findAllBy(PageRequest.of(page, size))
.map(this::toDTO);
}
public Mono<UserDTO> create(UserCreateRequest request) {
User entity = new User();
entity.setUsername(request.getUsername());
entity.setEmail(request.getEmail());
// 其他字段...
return repository.save(entity).map(this::toDTO);
}
public Flux<UserDTO> findAllStream() {
return repository.findAll().map(this::toDTO);
}
private UserDTO toDTO(User user) {
UserDTO dto = new UserDTO();
dto.setId(user.getId());
dto.setUsername(user.getUsername());
dto.setEmail(user.getEmail());
return dto;
}
}
import org.springframework.data.domain.Pageable;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import reactor.core.publisher.Flux;
public interface ReactiveUserRepository extends ReactiveCrudRepository<User, Long> {
Flux<User> findAllBy(Pageable pageable);
}
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.HttpStatusCode;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
@Service
public class ReactiveOrderClient {
private final WebClient webClient;
public ReactiveOrderClient(WebClient.Builder builder) {
this.webClient = builder
.baseUrl("http://order-service")
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.build();
}
public Mono<OrderDTO> getOrderById(Long id) {
return webClient.get()
.uri("/api/orders/{id}", id)
.retrieve()
.onStatus(HttpStatusCode::is4xxClientError,
resp -> Mono.error(new RuntimeException("订单不存在")))
.bodyToMono(OrderDTO.class);
}
public Flux<OrderDTO> getOrdersByUserId(Long userId) {
return webClient.get()
.uri(uriBuilder -> uriBuilder
.path("/api/orders")
.queryParam("userId", userId)
.build())
.retrieve()
.bodyToFlux(OrderDTO.class);
}
public Mono<OrderDTO> createOrder(OrderCreateRequest request) {
return webClient.post()
.uri("/api/orders")
.bodyValue(request)
.retrieve()
.bodyToMono(OrderDTO.class);
}
}
import java.util.concurrent.Executors;
public class VirtualThreadDemo {
// 方式一:虚拟线程池
public static void runWithVirtualThreads() throws Exception {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
executor.submit(() ->
System.out.println("Task 1, virtual=" + Thread.currentThread().isVirtual()));
executor.submit(() ->
System.out.println("Task 2, virtual=" + Thread.currentThread().isVirtual()));
}
}
// 方式二:直接创建虚拟线程
public static void threadOfVirtual() {
Thread.ofVirtual()
.name("vt-", 0)
.start(() ->
System.out.println("虚拟线程名称: " + Thread.currentThread().getName()));
}
}
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.StructuredTaskScope.ShutdownOnFailure;
public class StructuredConcurrencyDemo {
public static class Result {
String user;
String order;
String product;
}
public static Result fetchUserOrderProduct(long userId, long orderId, long productId) throws Exception {
try (var scope = new ShutdownOnFailure()) {
var userTask = scope.fork(() -> fetchUser(userId));
var orderTask = scope.fork(() -> fetchOrder(orderId));
var productTask = scope.fork(() -> fetchProduct(productId));
scope.join(); // 等待所有子任务完成
scope.throwIfFailed(); // 任一失败则抛异常
Result r = new Result();
r.user = userTask.get();
r.order = orderTask.get();
r.product = productTask.get();
return r;
}
}
private static String fetchUser(long id) throws InterruptedException {
Thread.sleep(100);
return "User-" + id;
}
private static String fetchOrder(long id) throws InterruptedException {
Thread.sleep(150);
return "Order-" + id;
}
private static String fetchProduct(long id) throws InterruptedException {
Thread.sleep(80);
return "Product-" + id;
}
}
application.yml:
spring:
threads:
virtual:
enabled: true
自定义异步任务使用虚拟线程:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@Configuration
public class VirtualThreadConfig {
@Bean
public Executor virtualThreadExecutor() {
return Executors.newVirtualThreadPerTaskExecutor();
}
}
@Service
class AsyncService {
@Async("virtualThreadExecutor")
public CompletableFuture<String> asyncTask() {
return CompletableFuture.completedFuture(
"run on: " + Thread.currentThread().getName() +
", virtual=" + Thread.currentThread().isVirtual()
);
}
}
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.stereotype.Service;
import java.time.Duration;
@Service
public class OrderMetricsService {
private final Counter orderCreatedCounter;
private final Timer orderCreateTimer;
public OrderMetricsService(MeterRegistry registry) {
this.orderCreatedCounter = registry.counter("order.created", "env", "dev");
this.orderCreateTimer = registry.timer("order.create.duration");
}
public void recordOrderCreated() {
orderCreatedCounter.increment();
}
public void recordOrderCreateDuration(Runnable runnable) {
orderCreateTimer.record(runnable);
}
public void recordOrderCreateDuration(Duration duration) {
orderCreateTimer.record(duration);
}
}
application.yml 配置:
management:
endpoints:
web:
exposure:
include: health,info,prometheus
endpoint:
prometheus:
enabled: true
metrics:
tags:
application: ${spring.application.name}
依赖示例:
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-brave</artifactId>
</dependency>
<dependency>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-reporter-brave</artifactId>
</dependency>
配置示例:
management:
tracing:
sampling:
probability: 1.0
zipkin:
tracing:
endpoint:
在业务代码中打 Span:
import io.micrometer.tracing.Tracer;
import org.springframework.stereotype.Service;
@Service
public class OrderService {
private final Tracer tracer;
public OrderService(Tracer tracer) {
this.tracer = tracer;
}
public OrderDTO createOrder(CreateOrderRequest request) {
var span = tracer.nextSpan().name("createOrder").start();
try (Tracer.SpanInScope ws = tracer.withSpan(span)) {
span.tag("order.userId", String.valueOf(request.getUserId()));
// 订单创建逻辑...
return new OrderDTO();
} finally {
span.end();
}
}
}
logback-spring.xml 简化示例:
<configuration>
<include resource="org/springframework/boot/logging/logback/defaults.xml"/>
<appender name="JSON" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<includeMdcKeyName>traceId</includeMdcKeyName>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="JSON"/>
</root>
</configuration>
在拦截器中写入 traceId:
import org.slf4j.MDC;
import org.springframework.web.servlet.HandlerInterceptor;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.UUID;
public class MdcInterceptor implements HandlerInterceptor {
private static final String TRACE_ID = "traceId";
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
String traceId = request.getHeader("X-Trace-Id");
if (traceId == null || traceId.isEmpty()) {
traceId = UUID.randomUUID().toString();
}
MDC.put(TRACE_ID, traceId);
response.setHeader("X-Trace-Id", traceId);
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response,
Object handler, Exception ex) {
MDC.remove(TRACE_ID);
}
}
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
@Service
public class OrderFacadeService {
private final OrderService orderService; // 传统阻塞 service
private final ReactiveOrderClient reactiveClient; // 响应式 HTTP 客户端
public OrderFacadeService(OrderService orderService,
ReactiveOrderClient reactiveClient) {
this.orderService = orderService;
this.reactiveClient = reactiveClient;
}
// 阻塞式接口内部使用 WebClient(建议放在虚拟线程中)
public OrderAggregateDTO getOrderWithDetails(Long orderId) {
OrderDTO order = orderService.getById(orderId);
if (order == null) {
return null;
}
Long userId = order.getUserId();
Long productId = order.getProductId();
Mono<UserDTO> userMono = reactiveClient.getUserById(userId);
Mono<ProductDTO> productMono = reactiveClient.getProductById(productId);
UserDTO user = userMono.block(); // 在虚拟线程中阻塞开销更小
ProductDTO product = productMono.block();
OrderAggregateDTO agg = new OrderAggregateDTO();
agg.setOrder(order);
agg.setUser(user);
agg.setProduct(product);
return agg;
}
}
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.stereotype.Component;
import javax.sql.DataSource;
import org.springframework.data.redis.connection.RedisConnectionFactory;
@Component
public class CustomHealthIndicator implements HealthIndicator {
private final DataSource dataSource;
private final RedisConnectionFactory redisConnectionFactory;
public CustomHealthIndicator(DataSource dataSource,
RedisConnectionFactory redisConnectionFactory) {
this.dataSource = dataSource;
this.redisConnectionFactory = redisConnectionFactory;
}
@Override
public Health health() {
try {
dataSource.getConnection().close();
} catch (Exception e) {
return Health.down().withException(e).build();
}
try {
redisConnectionFactory.getConnection().ping();
} catch (Exception e) {
return Health.down().withException(e).build();
}
return Health.up().build();
}
}
application.yml:
management:
endpoint:
health:
probes:
enabled: true
health:
livenessState:
enabled: true
readinessState:
enabled: true
动漫共和国官网入口-OmoFun动漫共和国官网入口直达app最新下载v1.45.86
Snapseed官网入口下载-Snapseed官网入口app下载安装2026最新v347.40
2026-02-13
2026-02-13