- 리액티브(reactive) 프로그래밍 : 데이터 스트림과 변경 사항 전파를 중심으로 하는 비동기 프로그래밍, 즉 데이터 흐름에 초점이 맞춰져 있고, 데이터를 비동기적으로 처리하며 이벤트 기반의 아키텍처로 실시간으로 데이터의 변화에 반응할 수 있는 프로그래밍이다.
- 논블로킹(non blocking) : A함수가 B함수를 호출해도 제어권은 그대로 자신이 가지고 있는다. 그래서 B 함수가 호출되어도 A함수는 계속 실행된다.
- 명령형 논리(명령을 순서대로 처리), JDBC, JPA
- 하나의 요청에 대해 하나의 쓰레드가 사용된다. 다량의 요청에 대비 쓰레드 풀을 생성해놓고 각 요청마다 쓰레드를 할당하여 처리한다.
- 1 request : 1 thread, sync + blocking
- 기능적 엔드 포인트, 이벤트 루프, 동시성 모델
- 논블로킹과 고정된 쓰레드 수로 모든 요청을 처리한다. 서버는 쓰레드 한 개로 운영하며, 디폴트로 CPU 코어 수 갯수의 쓰레드를 가진 워커 풀을 생성하여 워커 풀 내 쓰레드로 요청을 처리한다.
- many request : 1 thread, async + non-blocking
- H2 (io.r2dbc:r2dbc-h2)
- MariaDB (org.mariadb:r2dbc-mariadb)
- Microsoft SQL Server (io.r2dbc:r2dbc-mssql)
- MySQL (io.asyncer:r2dbc-mysql)
- jasync-sql MySQL (com.github.jasync-sql:jasync-r2dbc-mysql)
- Postgres (io.r2dbc:r2dbc-postgresql)
- Oracle (com.oracle.database.r2dbc:oracle-r2dbc)
- 두 개 다 Reactor Streams의 publisher 인터페이스의 구현체이다.
implementation 'org.springframework.boot:spring-boot-starter-webflux'
@RestController
public class FluxController {
@GetMapping("/")
Flux<String> justFlux() {
return Flux.just("Hello", "Flux");
}
}
curl localhost:8080
요청 시, text/plain 으로 응답 : HelloFluxcurl localhost:8080 -H 'Accept: text/event-stream'
Server-Sent Event 요청 시 : 아래와 같이 응답data:Hello
data:Flux
curl localhost:8080 -H 'Accept: application/stream+json'
JSON Stream으로 요청 시 : HelloFlux
- -i는 –include 응답 헤더를 출력
@RestController
public class FluxController {
@GetMapping("/stream")
Flux<Map<String, Integer>> stream() {
Stream<Integer> stream = Stream.iterate(0, i -> i + 1);
return Flux.fromStream(stream.limit(5))
.map(i -> Collections.singletonMap("value", i));
}
}
curl localhost:8080/stream
일반 JSON 요청 시 : [{"value":0},{"value":1},{"value":2},{"value":3},{"value":4}]
curl localhost:8080/stream 'Accept: text/event-stream'
Server-Sent Event 요청 시 : 아래와 같이 응답data:{"value":0}
data:{"value":1}
data:{"value":2}
data:{"value":3}
data:{"value":4}
curl localhost:8080/stream -H 'Accept: application/stream+json'
JSON Stream 요청 시 : 아래와 같이 응답{"value":0}
{"value":1}
{"value":2}
{"value":3}
{"value":4}
- 두 패턴은 공존할 수 없기 때문에 같은 요청으로 구현한 것은 무시되었다.
@Bean
RouterFunction<ServerResponse> routes() {
return RouterFunctions.route(RequestPredicates.GET("/"), req -> ServerResponse
.ok().body(Flux.just("Hello", "Flux"), String.class));
}
curl -i localhost:8080
요청 시, text/plain 으로 응답 : HelloFlux
@Component
public class FluxHandler {
@Bean
public RouterFunction<ServerResponse> routes() {
return route(GET("/"), this::hello)
.andRoute(GET("/stream"), this::stream)
.andRoute(POST("/upper"), this::upper);
}
public Mono<ServerResponse> hello(ServerRequest req) {
return ok().body(Flux.just("Hello", "Flux"), String.class);
}
public Mono<ServerResponse> stream(ServerRequest req) {
Stream<Integer> stream = Stream.iterate(0, i -> i + 1);
Flux<Map<String, Integer>> flux = Flux.fromStream(stream.limit(5))
.map(i -> Collections.singletonMap("value", i));
return ok().contentType(MediaType.APPLICATION_NDJSON)
.body(fromPublisher(flux, new ParameterizedTypeReference<Map<String, Integer>>(){}));
}
public Mono<ServerResponse> upper(ServerRequest req) {
Mono<String> body = req.bodyToMono(String.class).map(String::toUpperCase);
return ok().body(body, String.class);
}
}
curl localhost:8080/stream
요청 시 : 아래와 같이 응답{"value":0}
{"value":1}
{"value":2}
{"value":3}
{"value":4}
curl localhost:8080/upper -d hello
요청 시 : HELLO@Configuration
public class RouterConfig {
@Bean
RouterFunction<ServerResponse> routes(UserHandler handler) {
return route(GET("/handler/users").and(accept(MediaType.APPLICATION_JSON)), handler::getAllUsers)
.andRoute(GET("/handler/users/{userId}").and(contentType(MediaType.APPLICATION_JSON)), handler::getUserById)
.andRoute(POST("/handler/users").and(accept(MediaType.APPLICATION_JSON)), handler::create)
.andRoute(PUT("/handler/users/{userId}").and(contentType(MediaType.APPLICATION_JSON)), handler::updateUserById)
.andRoute(DELETE("/handler/users/{userId}").and(accept(MediaType.APPLICATION_JSON)), handler::deleteUserById);
}
}
@Component
@RequiredArgsConstructor
public class UserHandler {
private final UserService userService;
public Mono<ServerResponse> getAllUsers(ServerRequest request) {
return ServerResponse
.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(userService.getAllUsers(), User.class);
}
public Mono<ServerResponse> getUserById(ServerRequest request) {
return userService
.findById(request.pathVariable("userId"))
.flatMap(user -> ServerResponse
.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(user, User.class)
)
.switchIfEmpty(ServerResponse.notFound().build());
}
public Mono<ServerResponse> create(ServerRequest request) {
Mono<User> user = request.bodyToMono(User.class);
return user
.flatMap(u -> ServerResponse
.status(HttpStatus.CREATED)
.contentType(MediaType.APPLICATION_JSON)
.body(userService.createUser(u), User.class)
);
}
public Mono<ServerResponse> updateUserById(ServerRequest request) {
String id = request.pathVariable("userId");
Mono<User> updatedUser = request.bodyToMono(User.class);
return updatedUser
.flatMap(u -> ServerResponse
.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(userService.updateUser(id, u), User.class)
);
}
public Mono<ServerResponse> deleteUserById(ServerRequest request){
return userService.deleteUser(request.pathVariable("userId"))
.flatMap(u -> ServerResponse.ok().body(u, User.class))
.switchIfEmpty(ServerResponse.notFound().build());
}
}
- 위는 reflectoring 의 예시를 가져왔다. 아래 주소에서 원본글을 확인할 수 있다.
- HTTP Client 라이브러리를 상속받고 디폴트로 Netty의 HTTP Client를 사용한다.
동기적인 작업은 RestTemplate, 비동기적인 작업은 WebClient 가 적합하다.
@Component
@RequiredArgsConstructor
public class WebClientUtil {
private final WebClient webClient;
public WebClient.ResponseSpec getFakeUsers() {
return webClient
.get()
.uri("https://randomuser.me/api/")
.retrieve();
}
public Mono<User> postUser(User user) {
return webClient
.post()
.uri("http://localhost:9000/api/users")
.header("Authorization", "Basic MY_PASSWORD")
.accept(MediaType.APPLICATION_JSON)
.body(Mono.just(user), User.class)
.retrieve()
.bodyToMono(User.class)
.log()
.retryWhen(Retry.backoff(10, Duration.ofSeconds(2)))
.onErrorReturn(new User())
.doOnError(throwable -> log.error("Result error out for POST user", throwable))
.doFinally(signalType -> log.info("Result Completed for POST User: {}", signalType));
}
}
Reference: