개요
이전 글에서 비디오 플레이어를 구현했고, 이번에는 동영상 업로드, 스트리밍을 구현할 겁니다. 이것저것 수정한게 많아서 1편의 비디오 플레이어가 동작하지 않을 수 있습니다. 물론 복사/붙혀넣기로 따라하는 사람은 없겠죠? 그러니 그냥 참고만 하시고 만들어보는건 알아서 해보시기 바랍니다.
구조
Main Server와 Convert Server 사이에서 작업을 분산처리하기 위해 RabbitMQ를 도입했습니다.
RabbitMQ가 있음으로써 Main Server와 Convert Server 사이에 결합도가 사라집니다.
Main Server는 업로드 해야할 데이터를 Message에 담아 보내기만 하면 RabbitMQ에서 구독하고있는 Consumer들에게 분배하기 때문이죠. 따라서 Message 스펙만 동일하다면 Convert Server의 동작방식에 대해서 아무런 제약을 받지않습니다. 그리고 Convert Server의 확장도 자유롭게 가능하죠. HLS 변환 작업은 상당히 무거운 작업이기 때문에 Convert Server를 증설할 가능성이 매우 높습니다. 따라서 RabbitMQ는 비디오 스트리밍 서버를 구성하는데 있어서 상당히 중요한 역할을 합니다.
RabbitMQ
RabbitMQ에 대해서는 따로 글을 작성할건데 이 글을 이해하려면 짚고 넘어가야 해서 간단하게만 설명하고 넘어가겠습니다.
RabbitMQ는 메세지 브로커라고 합니다. 우체국 같은거죠. 보내는 사람은 받는 사람의 정보와 내용을 적어 편지를 보냅니다.
그럼 우체국은 받는 사람의 정보를 보고 그곳으로 발송해줍니다. 이때 보낸 사람과 받는 사람은 우체국에서 어떻게 보냈는지 몰라도됩니다. 어떤 직원이 뭘 타고왔는지 같은건 중요하지 않습니다. 이 역할을 하는 것이 RabbitMQ입니다.
여기에서 보낸사람을 Producer, 받는사람을 Consumer 라고합니다.
또 한가지, 받은사람은 우체국에게 메세지를 성공적으로 처리했는지 못했는지를 응답해줄 수 있습니다. 그걸 Ack 라고합니다. 그래서 우체국에서 Ack를 받아 메세지를 다시 보내거나 메세지를 삭제 해버리는 등 작업을 지시할 수 있습니다.
이정도만 설명해도 아래 글을 이해하는데 전혀 문제 없을것같습니다.
Main Server
메인서버에서는 동영상 스트리밍과 동영상 업로드를 다룹니다.
Page
@Controller
@RequiredArgsConstructor
public class MainController {
private final EntityService entityService;
@GetMapping("/upload")
public String uploadPage() {
return "upload";
}
@GetMapping
public String mainPage() {
return "main";
}
@GetMapping("/status")
public String statusPage() {
return "status";
}
@GetMapping("/video/{videoId}")
public String videoPage(@PathVariable String videoId, Model model) {
ResponseVideo videoDto = entityService.findByVideoId(videoId);
model.addAttribute("video", videoDto);
return "video";
}
}
@RestController
@RequiredArgsConstructor
@RequestMapping("/api/v1")
public class VideoController {
private final EntityService entityService;
private final StatusJpaRepository statusRepository;
@GetMapping("/videos/status")
public ResponseEntity<ResponseVideoStatus> getVideoStatus(@RequestParam(name = "q") String videoId) {
return statusRepository.findById(videoId)
.map(videoStatus -> ResponseEntity.ok(new ResponseVideoStatus(videoStatus.getStatus().name())))
.orElse(ResponseEntity.notFound().build());
}
@GetMapping("/videos")
public ResponseEntity<List<ResponseVideo>> getVideos() {
List<ResponseVideo> videos = entityService.findAll();
return ResponseEntity.ok(videos);
}
}
이번 프로젝트에서는 Entity를 직접 가져다 썼습니다. 동영상리스트를 보여주거나 하는건 이번 프로젝트에서 중요한 부분이 아니기 때문에 따라하시면 안됩니다.
"/api/v1/videos/status" 는 statusKey를 통해 현재 동영상 업로드 상황을 받을 수 있습니다.
Upload
@RestController
@RequiredArgsConstructor
@RequestMapping("/api/v1")
public class UploadController {
private final UploadService uploadService;
@PostMapping("/upload")
public ResponseEntity<VideoUploadResponse> upload(@ModelAttribute VideoUploadRequest videoUploadRequest) {
try {
ConvertRequest request = uploadService.createVideoConvertRequest(videoUploadRequest);
uploadService.upload(request);
return ResponseEntity.ok(VideoUploadResponse.success(request.getKey(), "Video upload started"));
} catch (RuntimeException e) {
return ResponseEntity.badRequest().body(VideoUploadResponse.fail("Video upload failed"));
}
}
}
upload 로직은 다음과 같습니다.
- Storage Server에 썸네일과 원본동영상을 업로드 시키고, 그 응답값으로 각각의 Key를 받습니다.
- 각각의 Key를 담은 Message를 전송합니다.
왜 모든 데이터를 한번에 Message에 담지않고, 이런 복잡한 과정을 거치도록 했을까요?
- RabbitMQ에 File 데이터를 직접 담을 수 없다.
- Base64 인코딩을 통해 보낼 수는 있지만 원본동영상의 특성상 대용량 데이터이기 때문에 메세지 크기가 매우 커질 수 있고, RabbitMQ의 리소스가 낭비될 수 있다.
- Message를 보내기 전에 Storage Server에 원본동영상을 업로드요청하면서 동영상의 형식을 제한하여 업로드 할 수 있고, 그에 따라 불필요한 업로드 요청을 줄일 수 있다.
- Main Server -> Convert Server 로 썸네일과 원본동영상을 전송한다고 하면 Main Server에서 Convert Server를 특정해야하고 각각의 Convert Server마다 Storage를 관리해야하는 문제와 Convert Server 다운 시 RabbitMQ에 재전송 요청이 들어온 데이터를 다른 Convert Server로 재전송 하지 못하고 해당 Convert Server가 복구될 때까지 업로드 하지 못하는 문제가 발생합니다.
- Convert Server 에서 convert 중 예기치 못한 서버 다운 시에 사용자의 재요청 없이 안전하게 다시 변환작업을 수행할 수 있다.
- Convert Server 종료 -> RabbitMQ에 nack전송(메세지 전송 실패, 재전송 요청)
- Convert Server 복구 -> RabbitMQ에서 Convert Server로 메세지 재전송 -> 변환작업
위의 이유로 썸네일과 원본동영상을 Main Server에서 Storage Server로 먼저 보내는 것입니다.
@Service
@RequiredArgsConstructor
public class UploadService {
private final UploadMessageProducer producer;
private final StorageService storageService;
public ConvertRequest createVideoConvertRequest(VideoUploadRequest upload) {
String statusKey = UUID.randomUUID().toString();
String thumbnail = null;
String originalVideo = null;
try {
thumbnail = storageService.uploadThumbnail(upload.thumbnail());
originalVideo = storageService.uploadOriginalVideo(upload.video());
return ConvertRequest.builder()
.key(statusKey)
.thumbnail(thumbnail)
.originalVideo(originalVideo)
.title(upload.title())
.build();
} catch (IOException e) {
storageService.delete(THUMBNAIL, thumbnail);
storageService.delete(ORIGINAL_VIDEO, originalVideo);
throw new RuntimeException(e);
}
}
public void upload(ConvertRequest request) {
producer.sendMessage(request);
}
}
내부 로직입니다. StorageServer에 썸네일이나 원본동영상이 정상적으로 업로드되지 않았다면 StorageServer에 데이터 삭제요청을 보냅니다.
public interface StorageService {
String uploadThumbnail(MultipartFile image) throws IOException;
String uploadOriginalVideo(MultipartFile video) throws IOException;
void delete(PathType pathType, String storeKey);
}
StorageService는 인터페이스로 S3나 내부저장서버에 맞게 추상화 해놓았습니다.
RabbitMQ
public class ConvertRequest {
private String key;
private String thumbnail;
private String originalVideo;
private String title;
}
ConvertRequest는 RabbitMQ에서 사용할 Message 스펙입니다. 썸네일과 원본동영상을 Storage Server에 저장하고 업로드 상태를 확인할 수 있는 key와 함께 Message를 전송합니다. 그리고 Controller에서 key를 활용해 동영상 업로드 상태를 유저가 실시간으로 확인할 수 있습니다.
@Configuration
public class MessageConfig {
@Bean
public Queue queue() {
return new Queue("video-queue");
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
@Service
@RequiredArgsConstructor
public class UploadMessageProducer {
private final RabbitTemplate rabbitTemplate;
public void sendMessage(ConvertRequest request) {
rabbitTemplate.convertAndSend("video-queue", request);
}
}
MessageConfig와 Producer 설정입니다.
Resource
@RestController
public class ResourceController {
@Value("${storage.server.address}")
private String storageAddress;
@GetMapping("/thumbnail/{filename}")
public Resource downloadImage(@PathVariable String filename) throws MalformedURLException {
return new UrlResource(concat("thumbnail", filename));
}
@GetMapping("/play/{videoId}/master.m3u8")
public Resource downloadVideo(@PathVariable String videoId) throws MalformedURLException {
return new UrlResource(concat("video", videoId, "master.m3u8"));
}
@GetMapping("/play/{videoId}/{stream}/{segment}")
public Resource downloadPlaylist(@PathVariable String videoId, @PathVariable String stream, @PathVariable String segment) throws MalformedURLException {
return new UrlResource(concat("video", videoId, stream, segment));
}
private String concat(String... urls) {
return storageAddress + "/" + String.join("/", urls);
}
}
동영상 플레이어 -> Strage Server 로 직접접근하지 않고
동영상 플레이어 -> Main Server -> Storage Server 를 통해서 접근하도록 했습니다.
Storage Server에 직접접근하면 보안에 취약해지기 때문에 오로지 Main Server로만 Storage Server에 접근하도록 했습니다.
이에 따라서 Storage Server에서 CORS 설정을 통해 외부 접근을 제한할 수 있게되었습니다.
메인서버의 설정이 모두 끝났습니다. 세부 구현사항은 굳이 작성할 필요가 없을것같아서 작성하지 않았습니다. 이정도만 작성해도 어떻게 돌아가는지 대충 보이지 않나요?
Convert Server
Config
@Slf4j
@Configuration
public class ExecutorConfig {
@Bean
public BlockingQueue<Runnable> videoQueue() {
return new LinkedBlockingQueue<>();
}
@Bean
public ThreadPoolExecutor videoExecutor() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // 코어 스레드 수
2, // 최대 스레드 수
0L,
TimeUnit.MILLISECONDS,
videoQueue(),
new ThreadPoolExecutor.CallerRunsPolicy() // 거부된 작업 처리 정책 추가
);
// 예외 모니터링 추가
executor.setThreadFactory(r -> {
Thread t = new Thread(r);
t.setUncaughtExceptionHandler((thread, e) -> {
log.error("Thread {} threw exception: {}", thread.getName(), e.getMessage(), e);
});
return t;
});
return executor;
}
}
Thread 관련 설정입니다. 최대 2개의 스레드만 허용하도록 했습니다.
@Configuration
public class MessageConfig {
@Bean
public Queue queue() {
return new Queue("video-queue");
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setPrefetchCount(1);
factory.setMessageConverter(jsonMessageConverter());
return factory;
}
}
Message 설정입니다. Ack 설정을 MANUAL로 하기 위해서 Bean에 등록해주었습니다. (AUTO 로 사용할거면 등록하지 않아도 자동으로 됨)
@Configuration
public class DirectoryConfig {
@Bean
SimpleFileVisitor<Path> visitor() {
return new SimpleFileVisitor<>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
Files.delete(file);
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
Files.delete(dir);
return FileVisitResult.CONTINUE;
}
};
}
}
디렉토리 설정입니다. Convert Server에서 임시로 Hls변환 데이터를 저장게 되는데 이때 Storage Server에 업로드를 하고 임시데이터를 한번에 삭제하기 위해 다음과 같이 설정해주었습니다.
public class DirectoryManager {
private final SimpleFileVisitor<Path> deleteVisitor;
public void deleteIfExists(Path path) {
try {
if (Files.isDirectory(path)) {
Files.walkFileTree(path, deleteVisitor);
} else {
Files.deleteIfExists(path);
}
} catch (IOException e) {
log.error("Failed to delete: {}", path, e);
}
}
}
DirectoryConfig는 다음과 같이 사용됩니다.
Consumer
@Service
@RequiredArgsConstructor
public class UploadMessageConsumer {
private final StatusService statusService;
private final UploadService uploadService;
@RabbitListener(queues = "video-queue", ackMode = "MANUAL")
public void receiveMessage(ConvertRequest message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
statusService.uploadStatus(message);
uploadService.upload(message, channel, tag);
}
}
@Service
@RequiredArgsConstructor
public class StatusService {
private final StatusRepository statusRepository;
@Transactional
public void uploadStatus(ConvertRequest message) {
statusRepository.save(VideoStatus.builder()
.statusKey(message.getKey())
.status(ProcessingStatus.PENDING)
.thumbnail(message.getThumbnail())
.originalVideo(message.getOriginalVideo())
.title(message.getTitle())
.build());
}
}
Message를 받게 되면 먼저 VideoStatus를 등록해주고,
@Slf4j
@Service
@RequiredArgsConstructor
public class UploadService {
private final UploadRepository uploadRepository;
private final StatusRepository statusRepository;
private final ExecutorRecovery executorRecovery;
public void upload(ConvertRequest message, Channel channel, long tag) {
try {
executorRecovery.execute(
new UploadStatus(message, channel, tag),
uploadRepository.submit(message, channel, tag)
);
} catch (RejectedExecutionException e) { // Thread Exception
log.error("Upload rejected: {}", e.getMessage(), e);
statusRepository.findById(message.getKey()).ifPresent(status -> status.setError("Upload rejected due to server load"));
throw new RuntimeException("Server is busy, please try again later", e);
}
}
}
변환작업을 시작해줍니다.
@Slf4j
@Component
@RequiredArgsConstructor
public class ExecutorRecovery {
private final Map<String, UploadStatus> activeUploads = new ConcurrentHashMap<>();
private final ExecutorService executor;
private final DirectoryManager directoryManager;
private final PathManager pathManager;
public void execute(UploadStatus status, Runnable runnable) throws RejectedExecutionException {
String key = status.message().getKey();
activeUploads.put(key, status);
executor.submit(() -> {
try {
runnable.run();
} finally {
activeUploads.remove(key);
}
});
}
@EventListener(ContextClosedEvent.class)
public void shutdown() {
// 현재 진행 중인 작업들의 상태 저장
activeUploads.forEach((key, status) -> {
try(Channel channel = status.channel()) {
channel.basicNack(status.tag(), false, true);
directoryManager.deleteIfExists(pathManager.get(VIDEO, key));
} catch (Exception e) {
log.error("Failed to handle shutdown for upload: {}", key, e);
}
});
}
}
Thread 작업 전에 ack, nack 전송을 위해 템플릿을 하나 만들었습니다.
ExecutorRecovery 클래스는 HLS 변환작업 중 서버가 종료되었을 때 메세지를 RabbitMQ에 반환하여 다른 Convert Server에서 요청을 처리하도록 만들었습니다.
@PreDestroy 를 사용하지 않은 이유는 Context가 먼저 종료되어 데이터가 남아있지 않기 때문에 ContextClosedEvent 이벤트를 감지해서 나머지 작업들을 nack 를 반환하도록 했습니다.
@Slf4j
@Repository
@RequiredArgsConstructor
public class UploadRepository {
private final StorageService storageService;
private final StatusRepository statusRepository;
private final VideoService videoService;
private final EntityService entityService;
private final ThreadTransactionManager threadTransactionManager;
public Runnable submit(ConvertRequest message, Channel channel, long tag) {
return () -> {
threadTransactionManager.updateStatus(message.getKey(), PROCESSING);
File originalVideo = null;
Path videoPath = null;
try {
originalVideo = storageService.download(ORIGINAL_VIDEO, message.getOriginalVideo());
VideoStatus videoStatus = statusRepository.findById(message.getKey())
.orElseThrow(() -> new RuntimeException("Video status not found"));
videoPath = videoService.generateHls(originalVideo, message);
entityService.saveVideoEntity(message.getKey(), videoStatus);
storageService.uploadVideo(videoPath);
storageService.delete(ORIGINAL_VIDEO, message.getOriginalVideo());
threadTransactionManager.updateStatus(message.getKey(), COMPLETED);
// 성공적으로 처리 완료 시 ack 보내기
channel.basicAck(tag, false);
} catch (Exception e) {
log.error("Error processing upload: {}", e.getMessage(), e);
threadTransactionManager.updateStatus(message.getKey(), videoStatus -> {
storageService.delete(THUMBNAIL, videoStatus.getThumbnail());
storageService.delete(ORIGINAL_VIDEO, videoStatus.getOriginalVideo());
videoStatus.setError("Upload failed: " + e.getMessage());
});
try {
// 실패 시 nack 보내기 (재처리 요청 x)
channel.basicNack(tag, false, false);
} catch (IOException ex) {
ex.printStackTrace();
}
} finally {
if (originalVideo != null) {
originalVideo.deleteOnExit();
}
if (videoPath != null) {
videoPath.toFile().deleteOnExit();
}
}
};
}
}
어우.. 지저분하죠.. 이것저것 붙히다보니까 이렇게 나오네요.. ThreadTransactionManager 는 Thread 내에서 영속성컨텍스트에 데이터를 적재하지 않고 바로바로 flush 하게 하려고 트랜잭션을 분리하기 위해 사용되었습니다. 업로드 상태는 Thread 가 실행되면서 Processing으로 변경되고, 종료가 될때 Completed 로 변경되어야 합니다. 이 사이에 유저는 현재 상태를 확인하기 때문에 ThreadTransactionManager를 하나 만들어서 사용했습니다.
위의 코드는 다음과 같은 일을합니다.
- 업로드 상태를 PROCESSING 으로 변경
- 원본 동영상을 Storage Server에서 받아옴
- HLS 변환
- Video Entity 생성
- HLS 변환된 파일을 Storage Server에 업로드
- 임시 원본 동영상 삭제
- 업로드 상태를 COMPLETED 로 변경
- 위 과정에서 예외 발생 시는 변환을 할 수 없는 동영상인 경우에 해당 메세지 재요청 없이 데이터 삭제
- 썸네일, 원본동영상 Storage Server에서 삭제
- 업로드 상태를 FAILED 로 변경
- Rabbit에 nack 요청 (재요청 x)
- finally 에서 Convert Server에 남아있는 임시 원본 동영상, HLS 변환파일을 삭제
하는일이 참 많습니다. 어쨌든 어떻게 돌아가는지는 알았으니까 다음으로 넘어가겠습니다.
@Override
public void uploadVideo(Path hlsDirectory) throws IOException {
// 임시 ZIP 파일 생성
File zipFile = File.createTempFile("hls-", ".zip");
try {
// HLS 디렉토리를 ZIP으로 압축
try (ZipOutputStream zipOut = new ZipOutputStream(new FileOutputStream(zipFile))) {
String folderName = hlsDirectory.getFileName().toString(); // 최상위 폴더명
// 먼저 폴더 엔트리 추가
zipOut.putNextEntry(new ZipEntry(folderName + "/"));
zipOut.closeEntry();
Files.walk(hlsDirectory)
.filter(path -> !Files.isDirectory(path))
.forEach(path -> {
try {
// folderName/나머지경로 형태로 생성
String entryPath = folderName + "/" + hlsDirectory.relativize(path);
ZipEntry zipEntry = new ZipEntry(entryPath);
zipOut.putNextEntry(zipEntry);
Files.copy(path, zipOut);
zipOut.closeEntry();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
}
// ZIP 파일 전송
MultiValueMap<String, Object> body = new LinkedMultiValueMap<>();
body.add("file", new FileSystemResource(zipFile));
restClient.post()
.uri("/api/v1/upload/hls")
.contentType(MediaType.MULTIPART_FORM_DATA)
.body(body)
.retrieve()
.body(String.class);
} catch (RestClientException e) {
throw new IOException("Upload failed", e);
} finally {
zipFile.deleteOnExit();
directoryManager.deleteIfExists(hlsDirectory);
}
}
마지막으로 한가지만 더 보고 가자면 Convert Server -> Storage Server 로 파일을 전송할때 zip 파일로 압축해서 전송하게됩니다.
ffmpeg
@Repository
@RequiredArgsConstructor
public class VideoRepository {
private final PathManager pathManager;
private final FFmpegManager ffmpegManager;
private final DirectoryManager directoryManager;
public Path processVideo(File video, String videoId) throws IOException {
Path videoPath = null;
try {
videoPath = Files.createDirectories(pathManager.get(PathType.VIDEO, videoId));
Process process = ffmpegManager.convert(videoPath, video.toPath());
printLog(process);
// 프로세스 완료 대기
int exitCode = process.waitFor();
if (exitCode != 0) {
Thread.currentThread().interrupt();
throw new RuntimeException("FFmpeg process failed with exit code: " + exitCode);
}
} catch (Exception e) {
if (videoPath != null) {
directoryManager.deleteIfExists(videoPath);
}
throw new IOException(e);
}
return videoPath;
}
private void printLog(Process process) throws IOException {
// FFmpeg 출력 로그 확인
try (var reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
reader.lines().forEach(line -> System.out.println("FFmpeg: " + line));
}
}
}
이 프로젝트에서 가장 중요한 핵심 로직입니다. ffmpeg 를 이용해서 동영상을 스트리밍 파일로 변환하는 과정이죠.
@Component
@RequiredArgsConstructor
public class FFmpegManagerImpl implements FFmpegManager {
@Value("${path.ffmpeg}")
private String ffmpegPath;
private final FFprobeManager ffprobeManager;
@Override
public Process convert(Path videoPath, Path originalVideoPath) throws IOException {
FFprobeResult analyze = ffprobeManager.analyze(originalVideoPath);
var command = FFmpegCommand.newCommand(ffmpegPath, videoPath, originalVideoPath, analyze)
.buildCommand();
return new ProcessBuilder(command)
.redirectErrorStream(true)
.start();
}
}
원래 command를 작성하는 부분이 상당히 길고, 동적으로 처리하지 못하는 등(동영상 화질을 하드코딩해서 설정하거나 업로드된 동영상의 화질은 480p 이지만 command에 720p 가 있다면 720p 도 생성되는 문제 등) 불편한 부분이 있었습니다. 그래서 command 를 동적으로 생성하도록 FFmpegCommand 클래스를 하나 만들었습니다. 그리고 그 command를 생성하기 위해서 동영상 정보를 분석하는 FFprobeManager 를 만들어서 동영상의 width, height, fps, audio 여부 등 데이터를 반환하도록 했습니다.
@Component
@RequiredArgsConstructor
public class FFprobeManager {
private final ObjectMapper mapper;
@Value("${path.ffprobe}")
private String ffprobePath;
public FFprobeResult analyze(Path originalVideoPath) throws IOException {
try {
// FFprobe 실행
Process process = convert(originalVideoPath);
// 결과 읽기
String json = readJson(process);
// 프로세스 완료 대기
int exitCode = process.waitFor();
if (exitCode != 0) {
throw new RuntimeException("FFprobe failed with exit code: " + exitCode);
}
return parseFFprobeResult(json);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("FFprobe process interrupted", e);
}
}
private String readJson(Process process) throws IOException {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8))) {
StringBuilder output = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
output.append(line);
}
return output.toString();
}
}
private Process convert(Path originalVideoPath) throws IOException {
List<String> command = List.of(
ffprobePath,
"-v", "error",
"-show_entries", "stream=codec_type,width,height,r_frame_rate",
"-of", "json",
originalVideoPath.toString()
);
return new ProcessBuilder(command)
.redirectErrorStream(true)
.start();
}
private FFprobeResult parseFFprobeResult(String json) throws IOException {
JsonNode root = mapper.readTree(json);
JsonNode streams = root.path("streams");
// 비디오 스트림 찾기
JsonNode videoStream = null;
boolean hasAudio = false;
for (JsonNode stream : streams) {
String codecType = stream.path("codec_type").asText();
switch (codecType) {
case "video" -> videoStream = stream;
case "audio" -> hasAudio = true;
}
}
if (videoStream == null) {
throw new IOException("No video stream found");
}
int width = videoStream.path("width").asInt();
int height = videoStream.path("height").asInt();
double fps = calculateFps(videoStream.path("r_frame_rate").asText());
return new FFprobeResult(width, height, fps, hasAudio);
}
private double calculateFps(String frameRate) {
String[] parts = frameRate.split("/");
if (parts.length == 2) {
double numerator = Double.parseDouble(parts[0]);
double denominator = Double.parseDouble(parts[1]);
return numerator / denominator;
}
return Double.parseDouble(frameRate);
}
}
@AllArgsConstructor(access = AccessLevel.PRIVATE)
public class FFmpegCommand {
private final String ffmpegPath;
private final Path videoPath;
private final Path originalVideoPath;
private final VideoStream videoStream;
private final AudioStream audioStream;
public static FFmpegCommand newCommand(String ffmpegPath, Path videoPath, Path originalVideoPath, FFprobeResult probeResult) {
return new FFmpegCommand(ffmpegPath, videoPath, originalVideoPath, probeResult.toVideoStream(), probeResult.toAudioStream());
}
public List<String> buildCommand() {
List<String> command = new ArrayList<>();
command.addAll(List.of(ffmpegPath, "-i", originalVideoPath.toString()));
command.addAll(List.of("-filter_complex", buildFilterComplex()));
command.addAll(buildMaps());
command.addAll(List.of("-c:v", "libx264"));
if (audioStream.exists()) {
command.addAll(List.of("-c:a", "aac"));
}
command.addAll(buildBitrates());
command.addAll(List.of(
"-var_stream_map", buildVarStreamMap(),
"-f", "hls",
"-hls_time", "10",
"-hls_list_size", "0",
"-hls_segment_type", "mpegts",
"-hls_segment_filename", videoPath + "/stream_%v/segment_%03d.ts",
"-master_pl_name", "master.m3u8",
videoPath + "/stream_%v/playlist.m3u8"
));
return command;
}
private String buildFilterComplex() {
List<String> scales = videoStream.getScales();
String splitCount = String.format("[0:v]split=%d", scales.size());
String outputs = scales.stream()
.map(s -> String.format("[v%d]", scales.indexOf(s) + 1))
.collect(Collectors.joining(""));
return splitCount + outputs + ";" + String.join(";", scales);
}
private List<String> buildBitrates() {
List<String> bitrates = new ArrayList<>();
List<String> qualities = videoStream.getAvailableQualities();
for (int i = 0; i < qualities.size(); i++) {
bitrates.add("-b:v:" + i);
bitrates.add(VideoStream.BITRATE.get(qualities.get(i)));
}
return bitrates;
}
private String buildVarStreamMap() {
List<String> streamMaps = new ArrayList<>();
List<String> qualities = videoStream.getAvailableQualities();
for (int i = 0; i < qualities.size(); i++) {
String quality = qualities.get(i);
streamMaps.add(audioStream.exists()
? String.format("v:%d,a:%d,name:%sp", i, i, quality)
: String.format("v:%d,name:%sp", i, quality));
}
return String.join(" ", streamMaps);
}
private List<String> buildMaps() {
List<String> maps = new ArrayList<>();
List<String> qualities = videoStream.getAvailableQualities();
for (String quality : qualities) {
maps.add("-map");
maps.add(String.format("[v%sp]", quality));
if (audioStream.exists()) {
maps.add("-map"); maps.add("0:a");
}
}
return maps;
}
}
public record VideoStream(int width, int height, double fps) {
public static final Map<String, String> BITRATE = Map.of(
"2160", "20000k", // 4K - 20Mbps
"1440", "12000k", // 2K - 12Mbps
"1080", "6000k", // FHD - 6Mbps
"720", "2500k", // HD - 2.5Mbps
"480", "1500k", // SD - 1.5Mbps
"360", "800k", // Low - 800Kbps
"240", "400k", // Very Low - 400Kbps
"144", "200k" // Lowest - 200Kbps
);
private static final Map<String, String> resolutions = Map.of(
"2160", "3840:2160",
"1440", "2560:1440",
"1080", "1920:1080",
"720", "1280:720",
"480", "854:480",
"360", "640:360",
"240", "426:240",
"144", "256:144"
);
public List<String> getAvailableQualities() {
List<String> qualities = new ArrayList<>();
if (height >= 2160) qualities.add("2160"); // 4K (3840x2160)
if (height >= 1440) qualities.add("1440"); // 2K (2560x1440)
if (height >= 1080) qualities.add("1080"); // Full HD (1920x1080)
if (height >= 720) qualities.add("720"); // HD (1280x720)
if (height >= 480) qualities.add("480"); // SD (854x480)
qualities.add("360"); // Low (640x360)
return qualities;
}
public List<String> getScales() {
List<String> scales = new ArrayList<>();
List<String> qualities = getAvailableQualities();
for (int i = 0; i < qualities.size(); i++) {
int streamIndex = qualities.size() - i;
String quality = qualities.get(i);
String scale = resolutions.get(quality);
scales.add(String.format("[v%d]scale=%s[v%sp]", streamIndex, scale, quality));
}
return scales;
}
}
public record AudioStream(boolean exists) {
}
ffmpeg 관련해서는 라이브러리로 만들어서 사용할 예정입니다. 만들어보니 다시는 만들고 싶지 않았거든요...
@Slf4j
@Configuration
@RequiredArgsConstructor
@EnableScheduling
public class ExecutorMonitor {
private final ThreadPoolExecutor videoExecutor;
private final BlockingQueue<Runnable> videoQueue;
@Scheduled(fixedDelay = 5000)
public void monitorVideo() {
// 현재 처리중인 스레드 수와 큐에 남은 처리 수 확인
log.info("최대 허용 스레드 수 : {}, 처리중인 스레드 수 : {}, 대기중인 스레드 수 : {}",
videoExecutor.getMaximumPoolSize(), videoExecutor.getActiveCount(), videoQueue.size());
}
}
마지막으로 스레드 모니터를 만들어서 얼마나 처리중인지, 대기중인 스레드는 몇개인지 이렇게 모니터링 해놓으면 완성입니다.
작성하지 않은 부분은 그다기 중요하지 않기도하고 전부 넣으면 글이 너무 길어지기에 추가하지 않았습니다.
Storage Server
resource
@RestController
@RequiredArgsConstructor
public class ResourceController implements DefaultResourceMethod {
private final PathManager pathManager;
@GetMapping("/original/{filename}")
public Resource downloadOriginal(@PathVariable String filename) throws MalformedURLException {
Path originalPath = pathManager.get(ORIGINAL_VIDEO, filename);
return resource(originalPath);
}
@GetMapping("/thumbnail/{filename}")
public Resource downloadImage(@PathVariable String filename) throws MalformedURLException {
Path thumbnailPath = pathManager.get(THUMBNAIL, filename);
return resource(thumbnailPath);
}
@GetMapping("/video/{videoId}/{master}.m3u8")
public Resource downloadVideo(@PathVariable String videoId, @PathVariable String master) throws MalformedURLException {
Path playlistPath = pathManager.get(VIDEO, videoId, M3U8.appendTo(master));
return resource(playlistPath);
}
@GetMapping("/video/{videoId}/{stream}/{segment}")
public Resource downloadPlaylist(@PathVariable String videoId, @PathVariable String stream, @PathVariable String segment) throws MalformedURLException {
Path segmentPath = pathManager.get(VIDEO, videoId, stream, segment);
return resource(segmentPath);
}
}
리소스는 똑같습니다.
@Slf4j
@RestController
@RequiredArgsConstructor
@RequestMapping("/api/v1/upload")
public class UploadController {
private final ImageService imageService;
private final VideoService videoService;
private final ZipService zipService;
@PostMapping("/thumbnail")
public ResponseEntity<StorageResponse> uploadThumbnail(@RequestParam("file") MultipartFile image) throws IOException {
log.info("UPLOAD Thumbnail : {} ", image.getOriginalFilename());
String storeKey = imageService.uploadThumbnail(image);
return ResponseEntity.ok(new StorageResponse(storeKey, "Thumbnail uploaded successfully"));
}
@PostMapping("/original")
public ResponseEntity<StorageResponse> uploadOriginalVideo(@RequestParam("file") MultipartFile originalVideo) throws IOException {
log.info("UPLOAD Original Video : {} ", originalVideo.getOriginalFilename());
String storeKey = videoService.uploadOriginalVideo(originalVideo);
return ResponseEntity.ok(new StorageResponse(storeKey, "Original video uploaded successfully"));
}
@PostMapping("/hls")
public ResponseEntity<Void> uploadHls(@RequestParam("file") MultipartFile zipFile) throws IOException {
log.info("UPLOAD HLS VIDEO : {} ", zipFile.getOriginalFilename());
zipService.unzip(PathType.VIDEO, zipFile);
return ResponseEntity.ok().build();
}
}
Convert Server -> Storage Server 로 스트리밍 데이터를 전송할때 압축해서 보내게됩니다. 따라서 unzip을 한 후에 데이터를 적재하는 과정이 있습니다.
@Component
@RequiredArgsConstructor
public class ZipService {
private final PathManager pathManager;
public void unzip(PathType pathType, MultipartFile zipFile) throws IOException {
Path basePath = pathManager.get(pathType);
try (ZipInputStream zis = new ZipInputStream(zipFile.getInputStream())) {
ZipEntry entry;
while ((entry = zis.getNextEntry()) != null) {
try {
extractEntry(zis, entry, basePath);
} finally {
zis.closeEntry();
}
}
}
}
private void extractEntry(ZipInputStream zis, ZipEntry entry, Path basePath) throws IOException {
Path filePath = basePath.resolve(entry.getName());
if (entry.isDirectory()) {
Files.createDirectories(filePath);
return;
}
Files.createDirectories(filePath.getParent());
Files.copy(zis, filePath, StandardCopyOption.REPLACE_EXISTING);
}
}
@Component
@RequiredArgsConstructor
public class ImageConverter {
@Value("${path.ffmpeg}")
private String ffmpegPath;
public Path convertToWebP(MultipartFile image, Path outputPath, int quality) throws IOException {
String originalFilename = image.getOriginalFilename();
// 임시 파일 생성
Path tempFile = Files.createTempFile("upload_", "_" + originalFilename);
try {
// MultipartFile을 임시 파일로 저장
Files.copy(image.getInputStream(), tempFile, StandardCopyOption.REPLACE_EXISTING);
// WebP로 변환
convertToWebP(tempFile, outputPath, quality);
return outputPath;
} finally {
// 임시 파일 삭제
Files.deleteIfExists(tempFile);
}
}
private void convertToWebP(Path beforeImage, Path afterImage, int quality) throws IOException {
// 품질 0 - 100 으로 변환
int validQuality = Math.min(100, Math.max(0, quality));
List<String> command = List.of(
ffmpegPath,
"-i", beforeImage.toString(),
"-quality", String.valueOf(validQuality),
"-y", // 기존 파일 덮어쓰기
afterImage.toString()
);
Process process = new ProcessBuilder(command)
.redirectErrorStream(true)
.start();
// 프로세스 완료 대기
try {
int exitCode = process.waitFor();
if (exitCode != 0) {
throw new RuntimeException("FFmpeg process failed with exit code: " + exitCode);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("FFmpeg process interrupted", e);
}
}
}
썸네일 사진은 webP 로 변환하여 데이터를 줄입니다. 이것도 ffmpeg 에서 해줍니다.
결론
시작은 호기심에 비디오 플레이어를 만들어보는 것으로 시작했는데 만들고 보니 너무 스케일이 커져서 여기까지 오게되었네요...
덕분에 비디오 스트리밍, HLS, RabbitMQ 에 대해서 정말 많이 공부가 되었던것 같습니다. 근데 아직 부족한 부분도 많고 수정하거나 추가하고 싶은 기능들이 아직 있어서 완성된것 같으면 그때 깃에 올려서 공유해보도록 하겠습니다.
이 프로젝트 전에 이벤트쿠폰이나 이벤트주문 기능을 만들면서 Thread를 처음 접했었는데 확실히 해보고나니까 이번 Thread 부분은 막힘없이 구현했던것 같습니다.
언제나 그렇듯 제 코드는 그냥 참고만 해주세요.
'FrameWork > Spring' 카테고리의 다른 글
[Spring Boot] 비디오 스트리밍 서버 만들기 (1) - 비디오 플레이어 (0) | 2025.02.13 |
---|---|
[Spring Boot] 이벤트 주문을 만들어보자 (3) (2) | 2025.01.21 |
[Spring Boot] 선착순 이벤트 쿠폰을 만들어보자 (2) - Redis (0) | 2025.01.19 |
[Spring Boot] 선착순 이벤트 쿠폰을 만들어보자 (1) - 동시성 이슈 (0) | 2025.01.16 |
[Spring Security][OAuth2][Resource Server] JWT 인증 (0) | 2025.01.13 |