Spring WebFlux指南

响应式控制器

import org.springframework.web.bind.annotation.*; import reactor.core.publisher.*; @RestController @RequestMapping("/users") public class UserController { private final UserRepository repo; // 返回 Flux(0到N个元素) @GetMapping public Flux<User> listUsers() { return repo.findAll(); } // 返回 Mono(0或1个元素) @GetMapping("/{id}") public Mono<User> getUser(@PathVariable Long id) { return repo.findById(id) .switchIfEmpty(Mono.error(new NotFoundException())); } @PostMapping @ResponseStatus(HttpStatus.CREATED) public Mono<User> createUser(@RequestBody User user) { return repo.save(user); } }

Reactor 操作符

Flux<Integer> numbers = Flux.range(1, 10); numbers .filter(n -> n % 2 == 0) // [2,4,6,8,10] .map(n -> n * n) // [4,16,36,64,100] .take(3) // [4,16,36] .flatMap(n -> fetchData(n)) // 异步映射 .onErrorResume(e -> Flux.empty()) // 错误恢复 .subscribe(System.out::println); // 合并流 Flux.zip(stream1, stream2) .map(tuple -> tuple.getT1() + tuple.getT2());

R2DBC 响应式仓库

// 依赖 // spring-boot-starter-data-r2dbc // r2dbc-postgresql (or r2dbc-h2 for testing) public interface UserRepository extends ReactiveCrudRepository<User, Long> { Flux<User> findByName(String name); Mono<User> findByEmail(String email); @Query("SELECT * FROM users WHERE active = :active") Flux<User> findByActive(@Param("active") boolean active); }