트러블슈팅

WebClient를 이용한 백프레셔 적용하기

초록색거북이 2024. 10. 4. 13:40
728x90
반응형
SMALL

문제상황

 

제가 개발하고 있는 SMS 발송 서비스에서, 데이터 발행자가 수신자가 처리할 수 있는 양을 넘는 데이터를 전송하는 문제가 발생했습니다. 즉, 수신자가 감당할 수 없는 속도로 데이터가 쏟아지면서 시스템에 과부하가 걸리는 상황이었습니다. 이런 경우, 발행자가 데이터를 보내는 속도와 수신자가 처리할 수 있는 속도를 조율하는 백프레셔가 필요합니다.

 

기존코드

private void bizppurioSendMsg(CompletableFuture<TokenResDto> token, MessageReqDto messageReqDto, SmsReqDto smsReqDto) {
    token.thenAccept(tokenResDto -> {
        WebClient.builder()
                .baseUrl(bizppurioMsgApiUrl)
                .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                .defaultHeaders(httpHeaders -> httpHeaders.setBearerAuth(tokenResDto.getAccesstoken()))
                .build()
                .post()
                .body(Mono.just(messageReqDto), MessageReqDto.class)
                .retrieve()
                .onStatus(HttpStatusCode::is4xxClientError, clientResponse -> {
                    log.error("<<:메세지 전송 400 실패>> : {}", clientResponse.bodyToMono(String.class));
                    amqpTemplate.convertAndSend("sendFailSmsQueue", smsReqDto);
                    return Mono.error(new MtnCommonException(MtnErrorCode.GENERAL_FAIL.getCode(), MtnErrorCode.GENERAL_FAIL.getMessage()));
                })
                .onStatus(HttpStatusCode::is5xxServerError, clientResponse -> {
                    log.warn("<<:메세지 전송 500 실패>> : {}", clientResponse.bodyToMono(String.class));
                    amqpTemplate.convertAndSend("sendFailSmsQueue", smsReqDto);
return Mono.error(new MtnCommonException(MtnErrorCode.GENERAL_FAIL.getCode(), MtnErrorCode.GENERAL_FAIL.getMessage()));
                })
                .bodyToMono(MessageResDto.class)
                .timeout(Duration.ofMinutes(5))
                .subscribe(response -> {
                    log.info("<<메세지 전송 성공>> : {}", response);
                    amqpTemplate.convertAndSend("sendSuccessSmsQueue", smsReqDto);
                }, error -> {
                    log.error("<<:메세지 전송 실패>>", error);
                    amqpTemplate.convertAndSend("sendFailSmsQueue", smsReqDto);
                });
    });
}

 

수정코드

 private void bizppurioSendReserveMsg(CompletableFuture<TokenResDto> token, List<MessageListDto> messageListDtos, SmsQueueReserveRunDto smsQueueReserveRunDto) {
        token.thenAccept(tokenResDto -> {
            AtomicInteger requestCount = new AtomicInteger(0);
            AtomicInteger batchCount = new AtomicInteger(0);
            Flux.fromIterable(messageListDtos)
                    .flatMap(messageListDto -> webClient.post()
                            .headers(headers -> headers.setBearerAuth(tokenResDto.getAccesstoken()))  // 토큰 설정
                            .body(Mono.just(messageListDto.getMessageReqDto()), MessageReqDto.class) // 단일 DTO를 Mono로 감싸서 전송
                            .retrieve()
                            .onStatus(HttpStatusCode::is4xxClientError, clientResponse -> clientResponse.bodyToMono(String.class)
                                    .flatMap(errorBody -> {
                                        log.error("<<:메세지 전송 400 실패>> : {}", errorBody);
                                        amqpTemplate.convertAndSend("sendSmsQueue.exchange", "multiReserveSmsFailQueue",
                                                new SmsReserveQueueResponseDto(smsQueueReserveRunDto, null, null, messageListDto.getMsgId()));
                                        return Mono.error(new MtnCommonException(MtnErrorCode.GENERAL_FAIL.getCode(), MtnErrorCode.GENERAL_FAIL.getMessage()));
                                    }))
                            .onStatus(HttpStatusCode::is5xxServerError, clientResponse -> clientResponse.bodyToMono(String.class)
                                    .flatMap(errorBody -> {
                                        log.warn("<<:메세지 전송 500 실패>> : {}", errorBody);
                                        amqpTemplate.convertAndSend("sendSmsQueue.exchange", "multiReserveSmsFailQueue",
                                                new SmsReserveQueueResponseDto(smsQueueReserveRunDto, null, null, messageListDto.getMsgId()));
                                        return Mono.error(new MtnCommonException(MtnErrorCode.GENERAL_FAIL.getCode(), MtnErrorCode.GENERAL_FAIL.getMessage()));
                                    }))
                            .bodyToMono(MessageResDto.class)
                            .timeout(Duration.ofMinutes(5))
                            .doOnSuccess(response -> {
                                amqpTemplate.convertAndSend("sendSmsQueue.exchange", "multiReserveSmsSuccessQueue",
                                        new SmsReserveQueueResponseDto(smsQueueReserveRunDto, response.getMessagekey(), response.getRefkey(), messageListDto.getMsgId()));
                            })
                            .doOnError(error -> {
                                log.error("<<:메세지 전송 실패>> ", error);
                                amqpTemplate.convertAndSend("sendSmsQueue.exchange", "multiReserveSmsFailQueue",
                                        new SmsReserveQueueResponseDto(smsQueueReserveRunDto, null, null, messageListDto.getMsgId()));
                            }), 500)  // 병렬로 처리할 메시지 개수 제한
                    .doOnSubscribe(subscription -> {
                        batchCount.incrementAndGet();  // 새로운 그룹(배치)이 시작될 때마다 카운트 증가
                        log.info("<< Batch {} 시작 - 총 {}개의 요청 중, {}번째 묶음 실행 중 >>",
                                batchCount.get(), messageListDtos.size(), batchCount.get());
                    })
                    .doOnComplete(() -> {
                        log.info("<< Batch {} 종료 - 총 처리된 요청: {} / {} >>",
                                batchCount.get(), requestCount.get(), messageListDtos.size());
                    })
                    .doOnTerminate(() -> log.info("<< 요청 종료 >>"))
                    .subscribe();
        });
    }

 

개선된 부분

 

  • Flux와 flatMap 사용: 여러 메시지를 한 번에 처리하는 대신 Flux.fromIterable()로 메시지 목록을 스트리밍 처리하고, flatMap()으로 비동기 호출을 관리했습니다.
  • 병렬 처리 제한: flatMap()의 두 번째 파라미터를 통해 병렬로 처리할 요청의 개수를 500개로 제한했습니다. 이를 통해 시스템 과부하를 방지했습니다.
  • 로깅 및 배치 처리: 각 배치의 시작과 끝을 로깅하여 현재 진행 상황을 추적할 수 있게 했습니다. 이를 통해 시스템 모니터링이 더욱 용이해졌습니다.

 

결론

 

백프레셔는 대량의 비동기 요청을 처리할 때 발행자와 수신자 간의 속도를 조절하여 시스템 과부하를 방지하는 중요한 개념입니다. WebClient와 Flux를 활용하여 백프레셔를 구현함으로써, 대량의 메시지를 효율적으로 관리할 수 있게 되었습니다. 이처럼 비동기 메시지 처리에서 발생하는 문제를 해결하기 위해 백프레셔를 적용하는 방법을 잘 고려해야 합니다.

728x90
반응형
LIST