728x90
반응형
SMALL
문제상황
제가 개발하고 있는 SMS 발송 서비스에서, 데이터 발행자가 수신자가 처리할 수 있는 양을 넘는 데이터를 전송하는 문제가 발생했습니다. 즉, 수신자가 감당할 수 없는 속도로 데이터가 쏟아지면서 시스템에 과부하가 걸리는 상황이었습니다. 이런 경우, 발행자가 데이터를 보내는 속도와 수신자가 처리할 수 있는 속도를 조율하는 백프레셔가 필요합니다.
기존코드
private void bizppurioSendMsg(CompletableFuture<TokenResDto> token, MessageReqDto messageReqDto, SmsReqDto smsReqDto) {
token.thenAccept(tokenResDto -> {
WebClient.builder()
.baseUrl(bizppurioMsgApiUrl)
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.defaultHeaders(httpHeaders -> httpHeaders.setBearerAuth(tokenResDto.getAccesstoken()))
.build()
.post()
.body(Mono.just(messageReqDto), MessageReqDto.class)
.retrieve()
.onStatus(HttpStatusCode::is4xxClientError, clientResponse -> {
log.error("<<:메세지 전송 400 실패>> : {}", clientResponse.bodyToMono(String.class));
amqpTemplate.convertAndSend("sendFailSmsQueue", smsReqDto);
return Mono.error(new MtnCommonException(MtnErrorCode.GENERAL_FAIL.getCode(), MtnErrorCode.GENERAL_FAIL.getMessage()));
})
.onStatus(HttpStatusCode::is5xxServerError, clientResponse -> {
log.warn("<<:메세지 전송 500 실패>> : {}", clientResponse.bodyToMono(String.class));
amqpTemplate.convertAndSend("sendFailSmsQueue", smsReqDto);
return Mono.error(new MtnCommonException(MtnErrorCode.GENERAL_FAIL.getCode(), MtnErrorCode.GENERAL_FAIL.getMessage()));
})
.bodyToMono(MessageResDto.class)
.timeout(Duration.ofMinutes(5))
.subscribe(response -> {
log.info("<<메세지 전송 성공>> : {}", response);
amqpTemplate.convertAndSend("sendSuccessSmsQueue", smsReqDto);
}, error -> {
log.error("<<:메세지 전송 실패>>", error);
amqpTemplate.convertAndSend("sendFailSmsQueue", smsReqDto);
});
});
}
수정코드
private void bizppurioSendReserveMsg(CompletableFuture<TokenResDto> token, List<MessageListDto> messageListDtos, SmsQueueReserveRunDto smsQueueReserveRunDto) {
token.thenAccept(tokenResDto -> {
AtomicInteger requestCount = new AtomicInteger(0);
AtomicInteger batchCount = new AtomicInteger(0);
Flux.fromIterable(messageListDtos)
.flatMap(messageListDto -> webClient.post()
.headers(headers -> headers.setBearerAuth(tokenResDto.getAccesstoken())) // 토큰 설정
.body(Mono.just(messageListDto.getMessageReqDto()), MessageReqDto.class) // 단일 DTO를 Mono로 감싸서 전송
.retrieve()
.onStatus(HttpStatusCode::is4xxClientError, clientResponse -> clientResponse.bodyToMono(String.class)
.flatMap(errorBody -> {
log.error("<<:메세지 전송 400 실패>> : {}", errorBody);
amqpTemplate.convertAndSend("sendSmsQueue.exchange", "multiReserveSmsFailQueue",
new SmsReserveQueueResponseDto(smsQueueReserveRunDto, null, null, messageListDto.getMsgId()));
return Mono.error(new MtnCommonException(MtnErrorCode.GENERAL_FAIL.getCode(), MtnErrorCode.GENERAL_FAIL.getMessage()));
}))
.onStatus(HttpStatusCode::is5xxServerError, clientResponse -> clientResponse.bodyToMono(String.class)
.flatMap(errorBody -> {
log.warn("<<:메세지 전송 500 실패>> : {}", errorBody);
amqpTemplate.convertAndSend("sendSmsQueue.exchange", "multiReserveSmsFailQueue",
new SmsReserveQueueResponseDto(smsQueueReserveRunDto, null, null, messageListDto.getMsgId()));
return Mono.error(new MtnCommonException(MtnErrorCode.GENERAL_FAIL.getCode(), MtnErrorCode.GENERAL_FAIL.getMessage()));
}))
.bodyToMono(MessageResDto.class)
.timeout(Duration.ofMinutes(5))
.doOnSuccess(response -> {
amqpTemplate.convertAndSend("sendSmsQueue.exchange", "multiReserveSmsSuccessQueue",
new SmsReserveQueueResponseDto(smsQueueReserveRunDto, response.getMessagekey(), response.getRefkey(), messageListDto.getMsgId()));
})
.doOnError(error -> {
log.error("<<:메세지 전송 실패>> ", error);
amqpTemplate.convertAndSend("sendSmsQueue.exchange", "multiReserveSmsFailQueue",
new SmsReserveQueueResponseDto(smsQueueReserveRunDto, null, null, messageListDto.getMsgId()));
}), 500) // 병렬로 처리할 메시지 개수 제한
.doOnSubscribe(subscription -> {
batchCount.incrementAndGet(); // 새로운 그룹(배치)이 시작될 때마다 카운트 증가
log.info("<< Batch {} 시작 - 총 {}개의 요청 중, {}번째 묶음 실행 중 >>",
batchCount.get(), messageListDtos.size(), batchCount.get());
})
.doOnComplete(() -> {
log.info("<< Batch {} 종료 - 총 처리된 요청: {} / {} >>",
batchCount.get(), requestCount.get(), messageListDtos.size());
})
.doOnTerminate(() -> log.info("<< 요청 종료 >>"))
.subscribe();
});
}
개선된 부분
- Flux와 flatMap 사용: 여러 메시지를 한 번에 처리하는 대신 Flux.fromIterable()로 메시지 목록을 스트리밍 처리하고, flatMap()으로 비동기 호출을 관리했습니다.
- 병렬 처리 제한: flatMap()의 두 번째 파라미터를 통해 병렬로 처리할 요청의 개수를 500개로 제한했습니다. 이를 통해 시스템 과부하를 방지했습니다.
- 로깅 및 배치 처리: 각 배치의 시작과 끝을 로깅하여 현재 진행 상황을 추적할 수 있게 했습니다. 이를 통해 시스템 모니터링이 더욱 용이해졌습니다.
결론
백프레셔는 대량의 비동기 요청을 처리할 때 발행자와 수신자 간의 속도를 조절하여 시스템 과부하를 방지하는 중요한 개념입니다. WebClient와 Flux를 활용하여 백프레셔를 구현함으로써, 대량의 메시지를 효율적으로 관리할 수 있게 되었습니다. 이처럼 비동기 메시지 처리에서 발생하는 문제를 해결하기 위해 백프레셔를 적용하는 방법을 잘 고려해야 합니다.
728x90
반응형
LIST
'트러블슈팅' 카테고리의 다른 글
대용량 데이터 삽입 시 JPA를 JDBC로 교체한 성능 최적화 사례 (2) | 2024.10.04 |
---|---|
Redis를 이용한 예약 메세지 시스템 개선: RabbitMQ의 FIFO 한계를 극복한 방법 (2) | 2024.10.04 |