diff --git a/batch/rss/src/main/kotlin/site/techmoa/batch/rss/port/BlogPort.kt b/batch/rss/src/main/kotlin/site/techmoa/batch/rss/port/BlogPort.kt deleted file mode 100644 index 87e095b..0000000 --- a/batch/rss/src/main/kotlin/site/techmoa/batch/rss/port/BlogPort.kt +++ /dev/null @@ -1,8 +0,0 @@ -package site.techmoa.batch.rss.port - -import site.techmoa.batch.rss.domain.model.Blog -import site.techmoa.batch.rss.domain.model.BlogStatus - -interface BlogPort { - fun findAllBy(active: BlogStatus): List -} \ No newline at end of file diff --git a/batch/rss/src/main/kotlin/site/techmoa/batch/rss/support/RssClient.kt b/batch/rss/src/main/kotlin/site/techmoa/batch/rss/support/RssClient.kt deleted file mode 100644 index 36a7dac..0000000 --- a/batch/rss/src/main/kotlin/site/techmoa/batch/rss/support/RssClient.kt +++ /dev/null @@ -1,8 +0,0 @@ -package site.techmoa.batch.rss.support - -import site.techmoa.batch.rss.domain.model.Article -import site.techmoa.batch.rss.domain.model.Blog - -interface RssClient { - fun fetch(blog: Blog): List
-} \ No newline at end of file diff --git a/boot/build.gradle.kts b/boot/build.gradle.kts index 0773e7a..cc618f9 100644 --- a/boot/build.gradle.kts +++ b/boot/build.gradle.kts @@ -6,12 +6,15 @@ dependencies { implementation(project(":domain")) implementation(project(":presentation")) implementation(project(":application")) - implementation(project(":batch:rss")) - implementation(project(":batch:schedules")) + + implementation(project(":worker:rss")) + implementation(project(":worker:scheduler")) + implementation(project(":infrastructure:oauth")) - implementation(project(":infrastructure:mysql")) implementation(project(":infrastructure:jpa")) implementation(project(":infrastructure:rest")) + implementation(project(":infrastructure:kafka")) + implementation(project(":infrastructure:mysql")) implementation("org.springframework.boot:spring-boot-starter") testRuntimeOnly("com.h2database:h2") diff --git a/boot/src/main/resources/application.yml b/boot/src/main/resources/application.yml index f1f5f07..6a66c3e 100644 --- a/boot/src/main/resources/application.yml +++ b/boot/src/main/resources/application.yml @@ -14,11 +14,13 @@ spring: - infrastructure-mysql-local - infrastructure-jwt-local - infrastructure-jpa-local + - infrastructure-kafka-local prod: - prod - infrastructure-mysql-prod - infrastructure-jwt-prod - infrastructure-jpa-prod + - infrastructure-kafka-prod management: endpoints: diff --git a/boot/src/test/resources/application-test.yml b/boot/src/test/resources/application-test.yml index 6c8dbaa..21824e5 100644 --- a/boot/src/test/resources/application-test.yml +++ b/boot/src/test/resources/application-test.yml @@ -20,6 +20,22 @@ spring: dialect: org.hibernate.dialect.MySQLDialect open-in-view: false + kafka: + bootstrap-servers: localhost:9092 # 필수: Kafka 브로커 주소 + + producer: + acks: all # 권장 + properties: + enable.idempotence: true # 권장: 중복 발행 방지 + linger.ms: 10 # 선택: 배치 지연 시간 + delivery.timeout.ms: 30000 # 선택: 전송 타임아웃 + + consumer: + group-id: webhook-discord-local # 필수: subscriber consumer group + auto-offset-reset: earliest # 선택: offset이 없을 때 처음부터 읽기 + enable-auto-commit: false # 권장: retry/DLT 처리 시 자동 커밋 비활성화 + max-poll-records: 10 # 선택: 한 번에 읽을 최대 레코드 수 + logging: level: org.hibernate.SQL: debug diff --git a/docs/discord-webhook-outbox-backlog-report.md b/docs/discord-webhook-outbox-backlog-report.md new file mode 100644 index 0000000..b35e5b9 --- /dev/null +++ b/docs/discord-webhook-outbox-backlog-report.md @@ -0,0 +1,40 @@ +# 디스코드 웹훅 아웃박스 적체 처리 시간 보고서 + +## 전제 + +- 아티클 수: `20` +- 유저 수: `5000` +- 아웃박스 `PENDING` 레코드 수: `20 x 5000 = 100000` +- 1회 스캔당 처리 건수: `10` +- 스캔 주기: `2분` + +## 계산 + +### 1) 필요한 스캔 횟수 + +`100000 / 10 = 10000`회 + +### 2) 총 소요 시간 + +- 스캔 1회를 시작한 뒤, 다음 스캔까지 `2분`씩 기다린다고 보면 +- `10000 x 2분 = 20000분` + +이를 시간과 일수로 바꾸면: + +- `20000분 = 333시간 20분` +- `333시간 20분 = 13일 21시간 20분` + +### 3) 엄밀한 해석 + +첫 스캔이 즉시 시작되고, 이후에만 2분 간격이 적용된다면 마지막 스캔까지의 경과 시간은 + +- `(10000 - 1) x 2분 = 19998분` +- `19998분 = 333시간 18분 = 13일 21시간 18분` + +즉, 해석 차이는 `2분`입니다. + +## 결론 + +아웃박스에 `100000`개의 `PENDING` 레코드가 쌓인 상황에서, 한 번에 `10`개씩 `2분` 주기로 처리한다면 전체를 `SUCCESS`로 바꾸는 데 필요한 시간은 대략 `2만 분`, 즉 약 `13일 21시간`입니다. + +엄밀하게는 첫 배치가 즉시 실행된다고 가정할 때 `19,998분`이므로, 최종 완료 시점은 약 `13일 21시간 18분`입니다. diff --git a/docs/discord-webhook-outbox-throughput-options.md b/docs/discord-webhook-outbox-throughput-options.md new file mode 100644 index 0000000..5ff2648 --- /dev/null +++ b/docs/discord-webhook-outbox-throughput-options.md @@ -0,0 +1,193 @@ +# 디스코드 웹훅 아웃박스 처리량 개선안 + +## 문제 요약 + +현재 구조는 아웃박스 테이블의 `PENDING` 레코드를 2분 주기로 10개씩 스캔해서 전송하는 방식이다. +이 구조에서는 유저 수와 아티클 수가 늘어날수록 `PENDING` 적체가 선형적으로 커지고, 완료까지 걸리는 시간이 급격히 길어진다. + +예시로: + +- 아티클 `20`개 +- 유저 `5000`명 +- 총 `PENDING` 레코드 `100000`개 +- 처리 속도 `10건 / 2분` + +이 경우 전체 처리 완료까지 대략 `13일 21시간`이 걸린다. + +즉, 문제의 본질은 "전송 로직이 너무 느리다"기보다 "단위 시간당 병렬로 처리할 수 있는 양이 너무 적다"는 점이다. + +--- + +## 개선 방향의 판단 기준 + +처리량 개선안을 볼 때는 아래 기준으로 판단하는 것이 좋다. + +- 실제 초당/분당 처리량이 늘어나는가 +- 버스트 트래픽을 흡수할 수 있는가 +- 실패 재시도와 장애 격리가 쉬운가 +- 운영 복잡도가 감당 가능한가 +- 중복 발송 방지와 순서 보장이 가능한가 + +--- + +## 1. Future 구현체를 사용한 병렬 처리 + +### 개념 + +스캔한 `PENDING` 레코드를 하나씩 순차 처리하지 않고, `Future`, `CompletableFuture`, 혹은 스레드풀 기반 작업으로 동시에 여러 건 처리하는 방식이다. + +### 기대 효과 + +- 같은 스캔 단위에서 전송 처리 시간을 줄일 수 있다 +- 구현 난이도가 비교적 낮다 +- 현재 아웃박스 구조를 크게 바꾸지 않고도 적용 가능하다 + +### 한계 + +- 병렬 수를 무제한으로 늘리면 외부 디스코드 API rate limit에 걸릴 수 있다 +- DB 락 경합, 스레드풀 고갈, 커넥션 풀 부족 문제가 생길 수 있다 +- 애플리케이션 인스턴스가 1개면 수평 확장 효과는 제한적이다 +- 장애 발생 시 재시도 정책과 중복 발송 방지를 별도로 잘 설계해야 한다 + +### 결론 + +가장 빨리 적용할 수 있는 개선안이지만, 장기적으로는 운영 안정성보다 "속도만 올리는" 성격이 강하다. + +--- + +## 2. 소비자 인스턴스를 복수 개로 늘리는 스케일 아웃 + +### 개념 + +같은 소비 로직을 여러 인스턴스에서 동시에 실행한다. +예를 들어 워커를 1개에서 5개로 늘리면, 같은 시간에 더 많은 아웃박스 레코드를 처리할 수 있다. + +### 기대 효과 + +- 가장 직관적인 처리량 증가 방법이다 +- 인스턴스 수를 늘리는 것만으로 처리량 확장이 가능하다 +- 트래픽 증가에 따라 탄력적으로 대응할 수 있다 + +### 한계 + +- 각 인스턴스가 같은 레코드를 집어가지 않도록 분산 락, 상태 선점, SKIP LOCKED 같은 장치가 필요하다 +- 인스턴스 수가 늘어나도 외부 API rate limit은 그대로다 +- 장애 시 중복 처리 가능성이 커지므로 멱등성 설계가 필수다 +- 테이블 스캔 방식이 그대로면 DB 부하도 같이 커진다 + +### 결론 + +처리량 자체는 분명히 늘릴 수 있지만, "어떻게 안전하게 분산할 것인가"가 핵심 난제가 된다. + +--- + +## 3. 메시지 큐를 도입해서 Pub/Sub 구조로 만드는 방식 + +### 핵심 정리 + +메시지 큐는 그 자체가 처리량을 자동으로 높여주지는 않는다. +대신 다음 두 가지를 가능하게 한다. + +- 생산자와 소비자를 분리해서 버스트를 흡수한다 +- 소비자를 수평 확장해서 병렬 처리량을 높인다 + +즉, 큐는 "처리량 증가의 직접 원인"이라기보다 "처리량을 안정적으로 늘릴 수 있게 해주는 기반"이다. + +### 왜 의미가 있는가 + +현재 구조는 DB 아웃박스 테이블을 주기적으로 훑는 폴링 모델이다. +이 모델은 레코드가 많아질수록 다음 문제가 커진다. + +- 불필요한 테이블 스캔 비용 +- 처리 지연 +- 스캔 주기 의존성 +- 확장 시 락/경합 문제 + +메시지 큐를 넣으면 아웃박스는 "대기열" 역할에 집중하고, 실제 전송은 큐 컨슈머가 담당하게 만들 수 있다. + +### 추천 아키텍처 + +```mermaid +flowchart LR + A[Article Event 발생] --> B[Outbox 저장] + B --> C[Outbox Relay / Publisher] + C --> D[(Message Queue)] + D --> E1[Consumer 1] + D --> E2[Consumer 2] + D --> E3[Consumer N] + E1 --> F[Discord Webhook] + E2 --> F + E3 --> F + F --> G[Delivery Result 저장] +``` + +### 동작 방식 + +1. 아티클 생성 또는 발행 이벤트가 발생한다. +2. 애플리케이션은 아웃박스 테이블에 전달할 메시지를 `PENDING` 상태로 저장한다. +3. 별도의 퍼블리셔가 아웃박스를 읽어 메시지 큐에 발행한다. +4. 여러 컨슈머가 큐를 구독하면서 디스코드 웹훅을 병렬로 전송한다. +5. 성공 시 `SUCCESS`, 실패 시 재시도 큐 또는 `RETRY` 상태로 이동한다. + +### 장점 + +- 생산과 소비를 분리해 장애 전파를 줄일 수 있다 +- 큐 길이로 적체 상황을 관찰하기 쉽다 +- 컨슈머 수를 늘려 처리량을 유연하게 조절할 수 있다 +- 실패 재처리, DLQ, 지연 재시도 같은 운영 전략을 붙이기 좋다 + +### 단점 + +- 시스템 구성요소가 늘어나고 운영 복잡도가 올라간다 +- exactly-once 처리는 현실적으로 어렵고, 결국 멱등성이 필요하다 +- 중복 발송 방지를 위해 메시지 키, deduplication, idempotency key가 필요하다 +- 큐와 DB 상태를 함께 관리해야 하므로 설계 난도가 높다 + +### 결론 + +처리량 개선과 운영 안정성을 함께 가져가려면 3번이 가장 설계적으로 낫다. +특히 "나중에 컨슈머를 여러 개로 늘릴 수 있는 구조"를 만들 수 있어서, 장기 확장성 측면에서 유리하다. + +--- + +## 비교 요약 + +| 방식 | 처리량 증가 효과 | 구현 난이도 | 운영 안정성 | 확장성 | 추천도 | +| --- | --- | --- | --- | --- | --- | +| Future 병렬 처리 | 중간 | 낮음 | 중간 | 낮음 | 보조안 | +| 소비자 스케일 아웃 | 높음 | 중간 | 중간 | 높음 | 유효한 직접안 | +| 메시지 큐 + Pub/Sub | 높음 | 높음 | 높음 | 매우 높음 | 최종 추천 | + +--- + +## 내가 추천하는 방향 + +이번 문제를 단순히 "더 빨리 보내기"로 보면 1번이나 2번으로도 일정 수준 해결할 수 있다. +하지만 장기적으로는 다음 이유로 3번이 더 적절하다. + +- 향후 유저 수와 아티클 수 증가를 감당하기 쉽다 +- 발송 실패, 재시도, DLQ, 모니터링을 붙이기 좋다 +- 스캔 기반 폴링을 점차 줄이고 이벤트 기반으로 이동할 수 있다 +- 시스템 경계를 명확히 나눌 수 있다 + +따라서 경험 목적과 설계 완성도를 같이 생각하면, `3번 메시지큐 도입`을 중심으로 잡는 것이 맞다. + +--- + +## 현실적인 단계적 도입안 + +1. 먼저 현재 아웃박스 구조는 유지한다. +2. 아웃박스 레코드를 큐로 발행하는 퍼블리셔를 분리한다. +3. 컨슈머에서 디스코드 웹훅 전송을 담당하게 한다. +4. 컨슈머를 여러 개로 늘릴 수 있게 멱등성과 분산 처리를 정리한다. +5. 충분히 안정화되면 테이블 스캔 중심 구조를 축소한다. + +--- + +## 정리 + +- 1번은 가장 빠른 개선안이다. +- 2번은 처리량을 직접 늘릴 수 있지만 분산 제어가 필요하다. +- 3번은 시스템적으로 가장 안정적으로 확장 가능한 방식이다. + +이번 케이스에서는 `3번`을 중심 설계로 잡고, 필요하면 `1번`을 보조 최적화로 섞는 접근이 가장 합리적이다. diff --git a/docs/kafka-vs-rabbitmq-decision-report.md b/docs/kafka-vs-rabbitmq-decision-report.md new file mode 100644 index 0000000..1f8920b --- /dev/null +++ b/docs/kafka-vs-rabbitmq-decision-report.md @@ -0,0 +1,243 @@ +# Kafka vs RabbitMQ 의사결정 보고서 + +## 배경 + +현재 목표는 디스코드 웹훅 알림을 아웃박스 기반으로 발행하고, 이를 별도 `sub` 인스턴스에서 소비하는 구조로 바꾸는 것이다. +지금은 `pub`, `sub` 모두 싱글 인스턴스로 시작하지만, 이후에는 `sub` 인스턴스를 분리하고 수평 확장까지 고려해야 한다. + +구현해야 할 기능은 다음과 같다. + +- 메시지 발행 +- 알림 전송 실패 시 재시도 큐 적재 +- 최종 실패 메시지는 DLQ에 적재하여 관리자 수동 관리 +- `sub` 인스턴스 분리 및 스케일 아웃 고려 + +--- + +## 결론 요약 + +### 최종 추천 + +`Kafka`를 선택하는 편이 더 낫다. + +### 이유 + +- `sub`를 별도 서비스로 분리하고, 이후 인스턴스를 늘리는 구조와 잘 맞는다 +- 메시지 재처리, 재생(replay), 소비자 그룹 기반 확장이 자연스럽다 +- 이번 프로젝트를 통해 Kafka를 학습하고 싶은 목적과도 일치한다 + +### 단, 중요한 전제 + +Kafka가 retry/DLQ를 자동으로 잘 해결해주는 것은 아니다. +오히려 `retry topic`, `DLQ topic`, `attempt count`, `nextRetryAt` 같은 설계를 직접 만들어야 한다. + +즉: + +- `RabbitMQ`는 이 기능들을 더 쉽게 구현한다 +- `Kafka`는 이 기능들을 더 확장성 있게 운영할 수 있다 + +--- + +## 비교 기준 + +이 의사결정은 다음 기준으로 봐야 한다. + +- 현재는 싱글 인스턴스지만, 이후 수평 확장이 쉬운가 +- 실패 재시도와 최종 실패 관리가 자연스러운가 +- 운영과 디버깅이 단순한가 +- 메시지 재처리와 replay가 가능한가 +- 학습 및 향후 확장 가치가 있는가 + +--- + +## Kafka와 RabbitMQ 비교 + +| 항목 | Kafka | RabbitMQ | +| --- | --- | --- | +| 현재 싱글 인스턴스 운영 | 가능 | 가능 | +| `sub` 분리 후 확장 | 매우 유리 | 유리 | +| 소비자 수평 확장 | 매우 유리 | 유리 | +| 메시지 replay | 매우 유리 | 제한적 | +| retry/DLQ 구현 난이도 | 중간~높음 | 낮음 | +| 작업 큐 의미론 | 중간 | 매우 유리 | +| 운영 복잡도 | 높음 | 중간 | +| 학습 가치 | 높음 | 중간 | + +--- + +## 기능별 적합성 + +### 1. 메시지 발행 + +두 제품 모두 가능하다. + +- Kafka: 토픽에 이벤트를 append하는 구조라 이벤트 발행 모델에 잘 맞는다 +- RabbitMQ: 라우팅과 큐 적재가 직관적이라 작업 발행 모델에 잘 맞는다 + +이 기능만 놓고 보면 둘 다 충분하다. + +### 2. 실패 시 재시도 큐 + +이 기능은 RabbitMQ가 더 단순하다. + +- RabbitMQ는 TTL + DLX 조합으로 지연 재시도 패턴을 만들기 쉽다 +- Kafka는 retry topic을 따로 두고, 소비자가 재시도 시점에 맞춰 다시 읽도록 설계해야 한다 + +즉, retry 자체의 구현 편의성은 RabbitMQ가 좋다. + +### 3. 최종 실패 메시지 DLQ + +두 제품 모두 가능하지만, 방식이 다르다. + +- RabbitMQ: DLX/DLQ 개념이 자연스럽고 설정이 단순하다 +- Kafka: `*.dlq` 토픽으로 별도 적재하는 방식이 일반적이다 + +관리자 수동 관리라는 목적에는 둘 다 잘 맞는다. + +### 4. sub 분리와 스케일 아웃 + +여기서는 Kafka가 더 강하다. + +- Kafka는 consumer group으로 여러 인스턴스를 같은 consumer 역할로 묶기 쉽다 +- 파티션 수만 충분하면, 소비자 인스턴스를 늘려 병렬 처리량을 키우기 좋다 +- 메시지 replay가 쉬워서 운영 중 복구나 재처리에 유리하다 + +RabbitMQ도 수평 확장이 가능하지만, 이후 이벤트 스트림 관점의 확장성은 Kafka가 더 좋다. + +--- + +## 추천 판단 + +### RabbitMQ가 더 나은 경우 + +- 목표가 빠른 구현과 단순한 운영이다 +- retry/DLQ를 최소한의 설계로 처리하고 싶다 +- 메시지 큐를 거의 작업 큐처럼 쓰고 싶다 + +### Kafka가 더 나은 경우 + +- `sub`를 별도 서비스로 분리하고, 이후 인스턴스 확장을 전제로 한다 +- 이벤트 재처리와 replay가 중요하다 +- 장기적으로 이벤트 스트리밍 구조로 확장할 가능성이 있다 +- 이번 기회에 Kafka를 학습하고 싶다 + +현재 요구를 보면 `Kafka` 쪽이 더 맞다. +특히 `sub`를 분리하고 스케일 아웃까지 고려하는 순간, 단순 작업 큐보다 이벤트 스트림에 가까운 구조가 되기 때문이다. + +--- + +## 권장 아키텍처 + +```mermaid +flowchart LR + A[Domain Event 발생] --> B[Outbox 저장] + B --> C[Outbox Publisher] + C --> D[(Kafka: notification.webhook)] + D --> E[Webhook Consumer Service] + E --> F[Discord Webhook 호출] + F --> G{성공?} + G -- Yes --> H[Success 저장] + G -- No --> I[(Kafka: notification.webhook.retry)] + I --> J{재시도 횟수 초과?} + J -- No --> E + J -- Yes --> K[(Kafka: notification.webhook.dlq)] + K --> L[관리자 수동 처리] +``` + +--- + +## Kafka 설계안 + +### 토픽 구성 + +- `notification.webhook`: 정상 발행 토픽 +- `notification.webhook.retry`: 재시도 토픽 +- `notification.webhook.dlq`: 최종 실패 토픽 + +### 메시지 키 + +권장 키는 다음 중 하나다. + +- `outboxId` +- `articleId + userId` + +중복 방지와 추적성을 생각하면 `outboxId`가 가장 단순하다. + +### 메시지 헤더 + +재시도와 운영 관리를 위해 아래 헤더를 두는 것이 좋다. + +- `attempt` +- `nextRetryAt` +- `errorCode` +- `errorMessage` +- `createdAt` + +### 소비자 동작 + +1. `notification.webhook`을 consume한다. +2. 디스코드 웹훅 전송을 시도한다. +3. 성공하면 해당 메시지를 성공 처리한다. +4. 실패하면 `attempt`를 증가시켜 `notification.webhook.retry`로 다시 publish한다. +5. 재시도 횟수가 기준을 넘으면 `notification.webhook.dlq`로 이동한다. + +### 재시도 전략 + +Kafka는 지연 큐가 기본 기능이 아니므로, 아래 중 하나를 선택해야 한다. + +- 단계형 retry topic을 둔다 + - 예: `retry.10s`, `retry.1m`, `retry.10m` +- retry consumer가 `nextRetryAt`을 보고 아직 이르면 다시 publish한다 + +실무적으로는 단계형 retry topic이 단순하고 운영하기 쉽다. + +--- + +## 운영 시 주의점 + +### 1. 중복 전송 가능성 + +Kafka든 RabbitMQ든 외부 웹훅 호출은 결국 at-least-once 처리로 가는 것이 현실적이다. +따라서 디스코드 호출 전에 DB 상태 확인 또는 멱등성 키 기반 관리가 필요하다. + +### 2. 파티션 수 계획 + +Kafka를 선택하면 파티션 수가 중요하다. + +- 파티션 수가 너무 적으면 소비자 수를 늘려도 병렬성이 제한된다 +- 처음부터 예상 최대 소비자 수보다 넉넉하게 잡는 편이 낫다 + +### 3. 재시도 폭주 + +재시도 메시지가 한꺼번에 몰리면 오히려 장애를 키울 수 있다. + +- 백오프 전략을 두고 +- 실패 원인별로 재시도 여부를 구분하고 +- DLQ로 빨리 보내는 기준을 명확히 해야 한다 + +### 4. 외부 API rate limit + +Kafka로 확장해도 디스코드 웹훅 rate limit은 그대로다. +즉, 큐 처리량과 실제 발송 허용량은 분리해서 생각해야 한다. + +--- + +## 최종 판단 + +이 프로젝트에서는 `RabbitMQ`가 retry/DLQ 구현은 더 쉽지만, +`sub` 분리와 향후 스케일 아웃, replay, 이벤트 기반 확장성을 고려하면 `Kafka`가 더 나은 선택이다. + +특히 다음 조건이 동시에 맞기 때문이다. + +- 지금은 싱글 인스턴스라도, 나중에 소비자를 늘릴 가능성이 높다 +- 메시지를 단순 큐가 아니라 이벤트 스트림처럼 다루는 방향이 더 유리하다 +- Kafka를 학습하고 싶은 목적이 있다 + +따라서 이번 구조는 `Kafka`를 채택하고, retry/DLQ는 토픽 분리로 직접 설계하는 방향을 추천한다. + +--- + +## 한 줄 결론 + +기능 구현의 단순함만 보면 `RabbitMQ`, +미래의 `sub` 분리와 스케일 아웃, replay, 학습 목적까지 포함하면 `Kafka`가 더 나은 선택이다. diff --git a/domain/src/main/kotlin/site/techmoa/domain/event/OutboxStatus.kt b/domain/src/main/kotlin/site/techmoa/domain/event/OutboxStatus.kt index d9aab0e..1c2cb1f 100644 --- a/domain/src/main/kotlin/site/techmoa/domain/event/OutboxStatus.kt +++ b/domain/src/main/kotlin/site/techmoa/domain/event/OutboxStatus.kt @@ -4,5 +4,5 @@ enum class OutboxStatus { PENDING, PUBLISHING, SUCCESS, - FAILED + FAIL, } diff --git a/domain/src/main/kotlin/site/techmoa/domain/event/WebhookGatewayPort.kt b/domain/src/main/kotlin/site/techmoa/domain/event/WebhookMessageBroker.kt similarity index 83% rename from domain/src/main/kotlin/site/techmoa/domain/event/WebhookGatewayPort.kt rename to domain/src/main/kotlin/site/techmoa/domain/event/WebhookMessageBroker.kt index bb53a6c..5ebb372 100644 --- a/domain/src/main/kotlin/site/techmoa/domain/event/WebhookGatewayPort.kt +++ b/domain/src/main/kotlin/site/techmoa/domain/event/WebhookMessageBroker.kt @@ -2,6 +2,6 @@ package site.techmoa.domain.event import site.techmoa.domain.event.OutboxMessages.NewArticlesOutboxMessage.OutboxPayload -interface WebhookGatewayPort { +interface WebhookMessageBroker { fun publish(message: OutboxPayload) } \ No newline at end of file diff --git a/infrastructure/kafka/build.gradle.kts b/infrastructure/kafka/build.gradle.kts new file mode 100644 index 0000000..62b8152 --- /dev/null +++ b/infrastructure/kafka/build.gradle.kts @@ -0,0 +1,6 @@ +dependencies { + implementation(project(":domain")) + implementation(project(":infrastructure:rest")) + implementation("org.springframework.kafka:spring-kafka") + testImplementation("org.springframework.kafka:spring-kafka-test") +} diff --git a/infrastructure/kafka/src/main/kotlin/site/techmoa/infrastructure/kafka/config/KafkaConfig.kt b/infrastructure/kafka/src/main/kotlin/site/techmoa/infrastructure/kafka/config/KafkaConfig.kt new file mode 100644 index 0000000..11c29ea --- /dev/null +++ b/infrastructure/kafka/src/main/kotlin/site/techmoa/infrastructure/kafka/config/KafkaConfig.kt @@ -0,0 +1,42 @@ +package site.techmoa.infrastructure.kafka.config + +import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.kafka.annotation.EnableKafka +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory +import org.springframework.kafka.core.* + +@Configuration +@EnableKafka +@EnableConfigurationProperties(KafkaProperties::class) +class KafkaConfig( + private val kafkaProperties: KafkaProperties, +) { + + @Bean + fun kafkaTemplate( + producerFactory: ProducerFactory, + ): KafkaTemplate { + return KafkaTemplate(producerFactory) + } + + @Bean + fun producerFactory(): ProducerFactory { + return DefaultKafkaProducerFactory(kafkaProperties.buildProducerProperties()) + } + + @Bean + fun consumerFactory(): ConsumerFactory { + return DefaultKafkaConsumerFactory(kafkaProperties.buildConsumerProperties()) + } + + @Bean + fun kafkaListenerContainerFactory( + consumerFactory: ConsumerFactory, + ): ConcurrentKafkaListenerContainerFactory { + val factory = ConcurrentKafkaListenerContainerFactory() + factory.consumerFactory = consumerFactory + return factory + } +} diff --git a/infrastructure/kafka/src/main/kotlin/site/techmoa/infrastructure/kafka/config/KafkaProperties.kt b/infrastructure/kafka/src/main/kotlin/site/techmoa/infrastructure/kafka/config/KafkaProperties.kt new file mode 100644 index 0000000..d2160c6 --- /dev/null +++ b/infrastructure/kafka/src/main/kotlin/site/techmoa/infrastructure/kafka/config/KafkaProperties.kt @@ -0,0 +1,48 @@ +package site.techmoa.infrastructure.kafka.config + +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.common.serialization.StringSerializer +import org.springframework.boot.context.properties.ConfigurationProperties + +@ConfigurationProperties(prefix = "spring.kafka") +data class KafkaProperties( + val bootstrapServers: String, + val producer: Producer, + val consumer: Consumer, +) { + fun buildProducerProperties(): Map { + return buildMap { + put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) + put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java) + put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java) + put(ProducerConfig.ACKS_CONFIG, producer.acks) + putAll(producer.properties) + } + } + + fun buildConsumerProperties(): Map { + return buildMap { + put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) + put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java) + put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java) + put(ConsumerConfig.GROUP_ID_CONFIG, consumer.groupId) + put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumer.autoOffsetReset) + put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumer.enableAutoCommit) + put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, consumer.maxPollRecords) + } + } + + data class Producer( + val acks: String, + val properties: Map = emptyMap(), + ) + + data class Consumer( + val groupId: String, + val autoOffsetReset: String, + val enableAutoCommit: Boolean, + val maxPollRecords: Int, + ) +} diff --git a/infrastructure/kafka/src/main/kotlin/site/techmoa/infrastructure/kafka/consumer/WebhookKafkaConsumer.kt b/infrastructure/kafka/src/main/kotlin/site/techmoa/infrastructure/kafka/consumer/WebhookKafkaConsumer.kt new file mode 100644 index 0000000..500a4c5 --- /dev/null +++ b/infrastructure/kafka/src/main/kotlin/site/techmoa/infrastructure/kafka/consumer/WebhookKafkaConsumer.kt @@ -0,0 +1,63 @@ +package site.techmoa.infrastructure.kafka.consumer + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.readValue +import org.slf4j.LoggerFactory +import org.springframework.kafka.annotation.KafkaListener +import org.springframework.kafka.annotation.RetryableTopic +import org.springframework.kafka.retrytopic.DltStrategy +import org.springframework.retry.annotation.Backoff +import org.springframework.stereotype.Component +import site.techmoa.domain.event.OutboxMessages +import site.techmoa.infrastructure.rest.DiscordClient + +@Component +class WebhookKafkaConsumer( + private val client : DiscordClient, + private val objectMapper: ObjectMapper, +) { + + companion object { + private const val TOPIC_NAME = "webhook-discord" + } + + private val log = LoggerFactory.getLogger(javaClass) + + @KafkaListener( + topics = [TOPIC_NAME], + containerFactory = "kafkaListenerContainerFactory", + ) + @RetryableTopic( + attempts = "4", + include = [IllegalStateException::class], + backoff = Backoff( + delay = 4_000, + multiplier = 2.0, + maxDelay = 60_000, + random = true, + ), + dltStrategy = DltStrategy.FAIL_ON_ERROR, + ) + fun consume(message: String) { + try { + val payload = objectMapper.readValue(message) + client.post(payload) + log.info( + "Consumed webhook message from Kafka and published to Discord. topic={}, articleId={}, webhookUrl={}", + TOPIC_NAME, + payload.articleId, + payload.webhookUrl, + ) + } catch (ex: Exception) { + log.error( + "Failed to consume webhook message from Kafka. topic={}", + TOPIC_NAME, + ex, + ) + throw IllegalStateException( + "Failed to consume webhook message from Kafka. topic=$TOPIC_NAME", + ex, + ) + } + } +} \ No newline at end of file diff --git a/infrastructure/kafka/src/main/kotlin/site/techmoa/infrastructure/kafka/publisher/WebhookKafkaPublisher.kt b/infrastructure/kafka/src/main/kotlin/site/techmoa/infrastructure/kafka/publisher/WebhookKafkaPublisher.kt new file mode 100644 index 0000000..14e1c6e --- /dev/null +++ b/infrastructure/kafka/src/main/kotlin/site/techmoa/infrastructure/kafka/publisher/WebhookKafkaPublisher.kt @@ -0,0 +1,53 @@ +package site.techmoa.infrastructure.kafka.publisher + +import com.fasterxml.jackson.databind.ObjectMapper +import org.slf4j.LoggerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.stereotype.Component +import site.techmoa.domain.event.OutboxMessages +import site.techmoa.domain.event.WebhookMessageBroker +import java.util.concurrent.ExecutionException + +@Component +class WebhookKafkaPublisher( + private val kafkaTemplate: KafkaTemplate, + private val objectMapper: ObjectMapper, +) : WebhookMessageBroker { + + companion object { + const val WEBHOOK_DISCORD = "webhook-discord" + } + + private val log = LoggerFactory.getLogger(javaClass) + + override fun publish(message: OutboxMessages.NewArticlesOutboxMessage.OutboxPayload) { + val payload = objectMapper.writeValueAsString(message) + try { + kafkaTemplate.send(WEBHOOK_DISCORD, payload).get() + log.info( + "Published webhook message to Kafka. topic={}, articleId={}, webhookUrl={}", + WEBHOOK_DISCORD, + message.articleId, + message.webhookUrl, + ) + } catch (ex: InterruptedException) { + Thread.currentThread().interrupt() + log.error( + "Interrupted while publishing webhook message to Kafka. topic={}, articleId={}, webhookUrl={}", + WEBHOOK_DISCORD, + message.articleId, + message.webhookUrl, + ex, + ) + throw IllegalStateException( + "Interrupted while publishing webhook message to Kafka. topic=$WEBHOOK_DISCORD, articleId=${message.articleId}, webhookUrl=${message.webhookUrl}", + ex, + ) + } catch (ex: ExecutionException) { + throw IllegalStateException( + "Failed to publish webhook message to Kafka. topic=$WEBHOOK_DISCORD, articleId=${message.articleId}, webhookUrl=${message.webhookUrl}", + ex.cause ?: ex, + ) + } + } +} \ No newline at end of file diff --git a/infrastructure/kafka/src/main/resources/application-infrastructure-kafka-local.yml b/infrastructure/kafka/src/main/resources/application-infrastructure-kafka-local.yml new file mode 100644 index 0000000..a973739 --- /dev/null +++ b/infrastructure/kafka/src/main/resources/application-infrastructure-kafka-local.yml @@ -0,0 +1,16 @@ +spring: + kafka: + bootstrap-servers: localhost:9092 # 필수: Kafka 브로커 주소 + + producer: + acks: all # 권장 + properties: + enable.idempotence: true # 권장: 중복 발행 방지 + linger.ms: 10 # 선택: 배치 지연 시간 + delivery.timeout.ms: 30000 # 선택: 전송 타임아웃 + + consumer: + group-id: webhook-discord-local # 필수: subscriber consumer group + auto-offset-reset: earliest # 선택: offset이 없을 때 처음부터 읽기 + enable-auto-commit: false # 권장: retry/DLT 처리 시 자동 커밋 비활성화 + max-poll-records: 10 # 선택: 한 번에 읽을 최대 레코드 수 diff --git a/infrastructure/kafka/src/main/resources/application-infrastructure-kafka-prod.yml b/infrastructure/kafka/src/main/resources/application-infrastructure-kafka-prod.yml new file mode 100644 index 0000000..21a8d09 --- /dev/null +++ b/infrastructure/kafka/src/main/resources/application-infrastructure-kafka-prod.yml @@ -0,0 +1,16 @@ +spring: + kafka: + bootstrap-servers: techmoa-kafka # 필수: Kafka 브로커 주소 + + producer: + acks: all # 권장 + properties: + enable.idempotence: true # 권장: 중복 발행 방지 + linger.ms: 10 # 선택: 배치 지연 시간 + delivery.timeout.ms: 30000 # 선택: 전송 타임아웃 + + consumer: + group-id: webhook-discord-local # 필수: subscriber consumer group + auto-offset-reset: earliest # 선택: offset이 없을 때 처음부터 읽기 + enable-auto-commit: false # 권장: retry/DLT 처리 시 자동 커밋 비활성화 + max-poll-records: 10 # 선택: 한 번에 읽을 최대 레코드 수 diff --git a/infrastructure/rest/src/main/kotlin/site/techmoa/infrastructure/rest/adapter/WebhookRestPublisher.kt b/infrastructure/rest/src/main/kotlin/site/techmoa/infrastructure/rest/DiscordClient.kt similarity index 81% rename from infrastructure/rest/src/main/kotlin/site/techmoa/infrastructure/rest/adapter/WebhookRestPublisher.kt rename to infrastructure/rest/src/main/kotlin/site/techmoa/infrastructure/rest/DiscordClient.kt index 18632d1..6d757e9 100644 --- a/infrastructure/rest/src/main/kotlin/site/techmoa/infrastructure/rest/adapter/WebhookRestPublisher.kt +++ b/infrastructure/rest/src/main/kotlin/site/techmoa/infrastructure/rest/DiscordClient.kt @@ -1,26 +1,24 @@ -package site.techmoa.infrastructure.rest.adapter +package site.techmoa.infrastructure.rest import org.slf4j.LoggerFactory import org.springframework.http.MediaType import org.springframework.stereotype.Component import org.springframework.web.reactive.function.client.WebClient import site.techmoa.domain.event.OutboxMessages.NewArticlesOutboxMessage.OutboxPayload -import site.techmoa.domain.event.WebhookGatewayPort -import site.techmoa.infrastructure.rest.dto.DiscordEmbed -import site.techmoa.infrastructure.rest.dto.DiscordWebhookPayload +import site.techmoa.infrastructure.rest.DiscordWebhookPayload.DiscordEmbed import java.time.Instant import java.time.ZoneId import java.time.format.DateTimeFormatter @Component -class WebhookRestPublisher( +class DiscordClient( webClient: WebClient.Builder -) : WebhookGatewayPort { +) { private val log = LoggerFactory.getLogger(javaClass) private val discordClient = webClient.build() - override fun publish(message: OutboxPayload) { + fun post(message: OutboxPayload) { val payload = toDiscordPayload(message) val response = discordClient.post() .uri(message.webhookUrl) @@ -31,7 +29,7 @@ class WebhookRestPublisher( .block() if (response == null || !response.statusCode.is2xxSuccessful) { - log.error("Webhook publishing error: ${response.body}") + log.error("Webhook publishing error. webhookUrl={}, status={}", message.webhookUrl, response?.statusCode) throw IllegalStateException("Failed to publish webhook. webhookUrl=${message.webhookUrl}, status=${response?.statusCode}") } diff --git a/infrastructure/rest/src/main/kotlin/site/techmoa/infrastructure/rest/DiscordWebhookPayload.kt b/infrastructure/rest/src/main/kotlin/site/techmoa/infrastructure/rest/DiscordWebhookPayload.kt new file mode 100644 index 0000000..e77ee08 --- /dev/null +++ b/infrastructure/rest/src/main/kotlin/site/techmoa/infrastructure/rest/DiscordWebhookPayload.kt @@ -0,0 +1,13 @@ +package site.techmoa.infrastructure.rest + +data class DiscordWebhookPayload( + val username: String? = null, + val avatarUrl: String? = null, + val embeds: List +) { + data class DiscordEmbed( + val title: String, + val description: String, + val url: String, + ) +} diff --git a/infrastructure/rest/src/main/kotlin/site/techmoa/infrastructure/rest/dto/DiscordWebhookPayload.kt b/infrastructure/rest/src/main/kotlin/site/techmoa/infrastructure/rest/dto/DiscordWebhookPayload.kt deleted file mode 100644 index 8128dd0..0000000 --- a/infrastructure/rest/src/main/kotlin/site/techmoa/infrastructure/rest/dto/DiscordWebhookPayload.kt +++ /dev/null @@ -1,13 +0,0 @@ -package site.techmoa.infrastructure.rest.dto - -data class DiscordWebhookPayload( - val username: String? = null, - val avatarUrl: String? = null, - val embeds: List -) - -data class DiscordEmbed( - val title: String, - val description: String, - val url: String, -) diff --git a/settings.gradle.kts b/settings.gradle.kts index d5f939e..2259206 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -31,7 +31,8 @@ include("infrastructure:mysql") include("infrastructure:jpa") include("infrastructure:oauth") include("infrastructure:rest") +include("infrastructure:kafka") -include("batch") -include("batch:rss") -include("batch:schedules") \ No newline at end of file +include("worker") +include("worker:rss") +include("worker:scheduler") \ No newline at end of file diff --git a/batch/rss/build.gradle.kts b/worker/rss/build.gradle.kts similarity index 100% rename from batch/rss/build.gradle.kts rename to worker/rss/build.gradle.kts diff --git a/batch/rss/src/main/java/site/techmoa/batch/rss/support/CustomInvalidXmlCharacterFilter.java b/worker/rss/src/main/java/site/techmoa/worker/rss/support/CustomInvalidXmlCharacterFilter.java similarity index 99% rename from batch/rss/src/main/java/site/techmoa/batch/rss/support/CustomInvalidXmlCharacterFilter.java rename to worker/rss/src/main/java/site/techmoa/worker/rss/support/CustomInvalidXmlCharacterFilter.java index d019c79..a136f23 100644 --- a/batch/rss/src/main/java/site/techmoa/batch/rss/support/CustomInvalidXmlCharacterFilter.java +++ b/worker/rss/src/main/java/site/techmoa/worker/rss/support/CustomInvalidXmlCharacterFilter.java @@ -1,4 +1,4 @@ -package site.techmoa.batch.rss.support; +package site.techmoa.worker.rss.support; import java.io.ByteArrayOutputStream; import java.io.IOException; diff --git a/batch/rss/src/main/kotlin/site/techmoa/batch/rss/config/BatchRssConfig.kt b/worker/rss/src/main/kotlin/site/techmoa/worker/rss/config/BatchRssConfig.kt similarity index 97% rename from batch/rss/src/main/kotlin/site/techmoa/batch/rss/config/BatchRssConfig.kt rename to worker/rss/src/main/kotlin/site/techmoa/worker/rss/config/BatchRssConfig.kt index a1f36dd..3543a91 100644 --- a/batch/rss/src/main/kotlin/site/techmoa/batch/rss/config/BatchRssConfig.kt +++ b/worker/rss/src/main/kotlin/site/techmoa/worker/rss/config/BatchRssConfig.kt @@ -1,4 +1,4 @@ -package site.techmoa.batch.rss.config +package site.techmoa.worker.rss.config import org.slf4j.LoggerFactory import org.springframework.context.annotation.Bean diff --git a/batch/rss/src/main/kotlin/site/techmoa/batch/rss/domain/exception/RssCollectionExecutionException.kt b/worker/rss/src/main/kotlin/site/techmoa/worker/rss/domain/exception/RssCollectionExecutionException.kt similarity index 70% rename from batch/rss/src/main/kotlin/site/techmoa/batch/rss/domain/exception/RssCollectionExecutionException.kt rename to worker/rss/src/main/kotlin/site/techmoa/worker/rss/domain/exception/RssCollectionExecutionException.kt index 163857e..0f87d20 100644 --- a/batch/rss/src/main/kotlin/site/techmoa/batch/rss/domain/exception/RssCollectionExecutionException.kt +++ b/worker/rss/src/main/kotlin/site/techmoa/worker/rss/domain/exception/RssCollectionExecutionException.kt @@ -1,4 +1,4 @@ -package site.techmoa.batch.rss.domain.exception +package site.techmoa.worker.rss.domain.exception class RssCollectionExecutionException( message: String, diff --git a/batch/rss/src/main/kotlin/site/techmoa/batch/rss/domain/model/Article.kt b/worker/rss/src/main/kotlin/site/techmoa/worker/rss/domain/model/Article.kt similarity index 93% rename from batch/rss/src/main/kotlin/site/techmoa/batch/rss/domain/model/Article.kt rename to worker/rss/src/main/kotlin/site/techmoa/worker/rss/domain/model/Article.kt index 02c2b61..65344b7 100644 --- a/batch/rss/src/main/kotlin/site/techmoa/batch/rss/domain/model/Article.kt +++ b/worker/rss/src/main/kotlin/site/techmoa/worker/rss/domain/model/Article.kt @@ -1,4 +1,4 @@ -package site.techmoa.batch.rss.domain.model +package site.techmoa.worker.rss.domain.model data class Article( val id: Long, diff --git a/batch/rss/src/main/kotlin/site/techmoa/batch/rss/domain/model/Blog.kt b/worker/rss/src/main/kotlin/site/techmoa/worker/rss/domain/model/Blog.kt similarity index 75% rename from batch/rss/src/main/kotlin/site/techmoa/batch/rss/domain/model/Blog.kt rename to worker/rss/src/main/kotlin/site/techmoa/worker/rss/domain/model/Blog.kt index 5c5d004..dfd0a5b 100644 --- a/batch/rss/src/main/kotlin/site/techmoa/batch/rss/domain/model/Blog.kt +++ b/worker/rss/src/main/kotlin/site/techmoa/worker/rss/domain/model/Blog.kt @@ -1,4 +1,4 @@ -package site.techmoa.batch.rss.domain.model +package site.techmoa.worker.rss.domain.model data class Blog( val id: Long, diff --git a/batch/rss/src/main/kotlin/site/techmoa/batch/rss/domain/model/BlogStatus.kt b/worker/rss/src/main/kotlin/site/techmoa/worker/rss/domain/model/BlogStatus.kt similarity index 58% rename from batch/rss/src/main/kotlin/site/techmoa/batch/rss/domain/model/BlogStatus.kt rename to worker/rss/src/main/kotlin/site/techmoa/worker/rss/domain/model/BlogStatus.kt index 15fc9af..aff27fb 100644 --- a/batch/rss/src/main/kotlin/site/techmoa/batch/rss/domain/model/BlogStatus.kt +++ b/worker/rss/src/main/kotlin/site/techmoa/worker/rss/domain/model/BlogStatus.kt @@ -1,4 +1,4 @@ -package site.techmoa.batch.rss.domain.model +package site.techmoa.worker.rss.domain.model enum class BlogStatus { ACTIVE, diff --git a/batch/rss/src/main/kotlin/site/techmoa/batch/rss/port/ArticlePort.kt b/worker/rss/src/main/kotlin/site/techmoa/worker/rss/port/ArticlePort.kt similarity index 63% rename from batch/rss/src/main/kotlin/site/techmoa/batch/rss/port/ArticlePort.kt rename to worker/rss/src/main/kotlin/site/techmoa/worker/rss/port/ArticlePort.kt index e9318b6..d8cf07f 100644 --- a/batch/rss/src/main/kotlin/site/techmoa/batch/rss/port/ArticlePort.kt +++ b/worker/rss/src/main/kotlin/site/techmoa/worker/rss/port/ArticlePort.kt @@ -1,6 +1,6 @@ -package site.techmoa.batch.rss.port +package site.techmoa.worker.rss.port -import site.techmoa.batch.rss.domain.model.Article +import site.techmoa.worker.rss.domain.model.Article interface ArticlePort { fun saveAllIgnoringDuplicates(articles: List
) diff --git a/worker/rss/src/main/kotlin/site/techmoa/worker/rss/port/BlogPort.kt b/worker/rss/src/main/kotlin/site/techmoa/worker/rss/port/BlogPort.kt new file mode 100644 index 0000000..af33382 --- /dev/null +++ b/worker/rss/src/main/kotlin/site/techmoa/worker/rss/port/BlogPort.kt @@ -0,0 +1,8 @@ +package site.techmoa.worker.rss.port + +import site.techmoa.worker.rss.domain.model.Blog +import site.techmoa.worker.rss.domain.model.BlogStatus + +interface BlogPort { + fun findAllBy(active: BlogStatus): List +} \ No newline at end of file diff --git a/batch/rss/src/main/kotlin/site/techmoa/batch/rss/repository/ArticleRepository.kt b/worker/rss/src/main/kotlin/site/techmoa/worker/rss/repository/ArticleRepository.kt similarity index 92% rename from batch/rss/src/main/kotlin/site/techmoa/batch/rss/repository/ArticleRepository.kt rename to worker/rss/src/main/kotlin/site/techmoa/worker/rss/repository/ArticleRepository.kt index 8c9cf96..ce95aa3 100644 --- a/batch/rss/src/main/kotlin/site/techmoa/batch/rss/repository/ArticleRepository.kt +++ b/worker/rss/src/main/kotlin/site/techmoa/worker/rss/repository/ArticleRepository.kt @@ -1,11 +1,11 @@ -package site.techmoa.batch.rss.repository +package site.techmoa.worker.rss.repository import org.springframework.jdbc.core.JdbcTemplate import org.springframework.stereotype.Repository import org.springframework.transaction.annotation.Propagation import org.springframework.transaction.annotation.Transactional -import site.techmoa.batch.rss.domain.model.Article -import site.techmoa.batch.rss.port.ArticlePort +import site.techmoa.worker.rss.domain.model.Article +import site.techmoa.worker.rss.port.ArticlePort import java.sql.Timestamp import java.time.Instant diff --git a/batch/rss/src/main/kotlin/site/techmoa/batch/rss/repository/BlogRepository.kt b/worker/rss/src/main/kotlin/site/techmoa/worker/rss/repository/BlogRepository.kt similarity index 82% rename from batch/rss/src/main/kotlin/site/techmoa/batch/rss/repository/BlogRepository.kt rename to worker/rss/src/main/kotlin/site/techmoa/worker/rss/repository/BlogRepository.kt index 1e62b15..d643a0b 100644 --- a/batch/rss/src/main/kotlin/site/techmoa/batch/rss/repository/BlogRepository.kt +++ b/worker/rss/src/main/kotlin/site/techmoa/worker/rss/repository/BlogRepository.kt @@ -1,10 +1,10 @@ -package site.techmoa.batch.rss.repository +package site.techmoa.worker.rss.repository import org.springframework.jdbc.core.JdbcTemplate import org.springframework.stereotype.Repository -import site.techmoa.batch.rss.domain.model.Blog -import site.techmoa.batch.rss.domain.model.BlogStatus -import site.techmoa.batch.rss.port.BlogPort +import site.techmoa.worker.rss.domain.model.Blog +import site.techmoa.worker.rss.domain.model.BlogStatus +import site.techmoa.worker.rss.port.BlogPort @Repository(value = "RssBlogRepository") class BlogRepository( diff --git a/batch/rss/src/main/kotlin/site/techmoa/batch/rss/support/ArticleLinkManager.kt b/worker/rss/src/main/kotlin/site/techmoa/worker/rss/support/ArticleLinkManager.kt similarity index 92% rename from batch/rss/src/main/kotlin/site/techmoa/batch/rss/support/ArticleLinkManager.kt rename to worker/rss/src/main/kotlin/site/techmoa/worker/rss/support/ArticleLinkManager.kt index fd9afc5..01fbbde 100644 --- a/batch/rss/src/main/kotlin/site/techmoa/batch/rss/support/ArticleLinkManager.kt +++ b/worker/rss/src/main/kotlin/site/techmoa/worker/rss/support/ArticleLinkManager.kt @@ -1,4 +1,4 @@ -package site.techmoa.batch.rss.support +package site.techmoa.worker.rss.support import org.springframework.stereotype.Component diff --git a/batch/rss/src/main/kotlin/site/techmoa/batch/rss/support/ParsedItem.kt b/worker/rss/src/main/kotlin/site/techmoa/worker/rss/support/ParsedItem.kt similarity index 84% rename from batch/rss/src/main/kotlin/site/techmoa/batch/rss/support/ParsedItem.kt rename to worker/rss/src/main/kotlin/site/techmoa/worker/rss/support/ParsedItem.kt index 407d261..72a8717 100644 --- a/batch/rss/src/main/kotlin/site/techmoa/batch/rss/support/ParsedItem.kt +++ b/worker/rss/src/main/kotlin/site/techmoa/worker/rss/support/ParsedItem.kt @@ -1,4 +1,4 @@ -package site.techmoa.batch.rss.support +package site.techmoa.worker.rss.support sealed class ParsedItem { diff --git a/worker/rss/src/main/kotlin/site/techmoa/worker/rss/support/RssClient.kt b/worker/rss/src/main/kotlin/site/techmoa/worker/rss/support/RssClient.kt new file mode 100644 index 0000000..9dff2ec --- /dev/null +++ b/worker/rss/src/main/kotlin/site/techmoa/worker/rss/support/RssClient.kt @@ -0,0 +1,8 @@ +package site.techmoa.worker.rss.support + +import site.techmoa.worker.rss.domain.model.Article +import site.techmoa.worker.rss.domain.model.Blog + +interface RssClient { + fun fetch(blog: Blog): List
+} \ No newline at end of file diff --git a/batch/rss/src/main/kotlin/site/techmoa/batch/rss/support/RssReaderClient.kt b/worker/rss/src/main/kotlin/site/techmoa/worker/rss/support/RssReaderClient.kt similarity index 92% rename from batch/rss/src/main/kotlin/site/techmoa/batch/rss/support/RssReaderClient.kt rename to worker/rss/src/main/kotlin/site/techmoa/worker/rss/support/RssReaderClient.kt index f1ec216..2a323b5 100644 --- a/batch/rss/src/main/kotlin/site/techmoa/batch/rss/support/RssReaderClient.kt +++ b/worker/rss/src/main/kotlin/site/techmoa/worker/rss/support/RssReaderClient.kt @@ -1,10 +1,10 @@ -package site.techmoa.batch.rss.support +package site.techmoa.worker.rss.support import com.apptasticsoftware.rssreader.Item import com.apptasticsoftware.rssreader.RssReader import org.springframework.stereotype.Component -import site.techmoa.batch.rss.domain.model.Article -import site.techmoa.batch.rss.domain.model.Blog +import site.techmoa.worker.rss.domain.model.Article +import site.techmoa.worker.rss.domain.model.Blog import java.time.ZonedDateTime /** @@ -16,7 +16,7 @@ import java.time.ZonedDateTime * - 해당 필터는 추후 라이브러리 교체 또는 * 파싱 전략 변경 시 제거 예정입니다. * - * @see com.apptasticsoftware.rssreader.RssReader + * @see RssReader * @see CustomInvalidXmlCharacterFilter */ @Component diff --git a/batch/rss/src/main/kotlin/site/techmoa/batch/rss/trigger/CollectRssUseCase.kt b/worker/rss/src/main/kotlin/site/techmoa/worker/rss/trigger/CollectRssUseCase.kt similarity index 55% rename from batch/rss/src/main/kotlin/site/techmoa/batch/rss/trigger/CollectRssUseCase.kt rename to worker/rss/src/main/kotlin/site/techmoa/worker/rss/trigger/CollectRssUseCase.kt index 986d868..64d08e5 100644 --- a/batch/rss/src/main/kotlin/site/techmoa/batch/rss/trigger/CollectRssUseCase.kt +++ b/worker/rss/src/main/kotlin/site/techmoa/worker/rss/trigger/CollectRssUseCase.kt @@ -1,4 +1,4 @@ -package site.techmoa.batch.rss.trigger +package site.techmoa.worker.rss.trigger interface CollectRssUseCase { fun execute() diff --git a/batch/rss/src/main/kotlin/site/techmoa/batch/rss/trigger/LocalRssCollectorTrigger.kt b/worker/rss/src/main/kotlin/site/techmoa/worker/rss/trigger/LocalRssCollectorTrigger.kt similarity index 92% rename from batch/rss/src/main/kotlin/site/techmoa/batch/rss/trigger/LocalRssCollectorTrigger.kt rename to worker/rss/src/main/kotlin/site/techmoa/worker/rss/trigger/LocalRssCollectorTrigger.kt index b7d67b0..b6de04b 100644 --- a/batch/rss/src/main/kotlin/site/techmoa/batch/rss/trigger/LocalRssCollectorTrigger.kt +++ b/worker/rss/src/main/kotlin/site/techmoa/worker/rss/trigger/LocalRssCollectorTrigger.kt @@ -1,4 +1,4 @@ -package site.techmoa.batch.rss.trigger +package site.techmoa.worker.rss.trigger import org.springframework.context.annotation.Profile import org.springframework.scheduling.annotation.Scheduled diff --git a/batch/rss/src/main/kotlin/site/techmoa/batch/rss/trigger/ProdRssCollectorTrigger.kt b/worker/rss/src/main/kotlin/site/techmoa/worker/rss/trigger/ProdRssCollectorTrigger.kt similarity index 92% rename from batch/rss/src/main/kotlin/site/techmoa/batch/rss/trigger/ProdRssCollectorTrigger.kt rename to worker/rss/src/main/kotlin/site/techmoa/worker/rss/trigger/ProdRssCollectorTrigger.kt index a18cf8e..682c023 100644 --- a/batch/rss/src/main/kotlin/site/techmoa/batch/rss/trigger/ProdRssCollectorTrigger.kt +++ b/worker/rss/src/main/kotlin/site/techmoa/worker/rss/trigger/ProdRssCollectorTrigger.kt @@ -1,4 +1,4 @@ -package site.techmoa.batch.rss.trigger +package site.techmoa.worker.rss.trigger import org.springframework.context.annotation.Profile import org.springframework.scheduling.annotation.Scheduled diff --git a/batch/rss/src/main/kotlin/site/techmoa/batch/rss/trigger/RssCollectorTriggerSupport.kt b/worker/rss/src/main/kotlin/site/techmoa/worker/rss/trigger/RssCollectorTriggerSupport.kt similarity index 88% rename from batch/rss/src/main/kotlin/site/techmoa/batch/rss/trigger/RssCollectorTriggerSupport.kt rename to worker/rss/src/main/kotlin/site/techmoa/worker/rss/trigger/RssCollectorTriggerSupport.kt index 54e4bc6..22b9d6a 100644 --- a/batch/rss/src/main/kotlin/site/techmoa/batch/rss/trigger/RssCollectorTriggerSupport.kt +++ b/worker/rss/src/main/kotlin/site/techmoa/worker/rss/trigger/RssCollectorTriggerSupport.kt @@ -1,7 +1,7 @@ -package site.techmoa.batch.rss.trigger +package site.techmoa.worker.rss.trigger import org.slf4j.LoggerFactory -import site.techmoa.batch.rss.domain.exception.RssCollectionExecutionException +import site.techmoa.worker.rss.domain.exception.RssCollectionExecutionException abstract class RssCollectorTriggerSupport( private val collector: CollectRssUseCase diff --git a/batch/rss/src/main/kotlin/site/techmoa/batch/rss/usecase/CollectByRssReaderUseCase.kt b/worker/rss/src/main/kotlin/site/techmoa/worker/rss/usecase/CollectByRssReaderUseCase.kt similarity index 88% rename from batch/rss/src/main/kotlin/site/techmoa/batch/rss/usecase/CollectByRssReaderUseCase.kt rename to worker/rss/src/main/kotlin/site/techmoa/worker/rss/usecase/CollectByRssReaderUseCase.kt index fd9e5e8..96194e3 100644 --- a/batch/rss/src/main/kotlin/site/techmoa/batch/rss/usecase/CollectByRssReaderUseCase.kt +++ b/worker/rss/src/main/kotlin/site/techmoa/worker/rss/usecase/CollectByRssReaderUseCase.kt @@ -1,15 +1,15 @@ -package site.techmoa.batch.rss.usecase +package site.techmoa.worker.rss.usecase import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Qualifier import org.springframework.stereotype.Component -import site.techmoa.batch.rss.domain.model.Article -import site.techmoa.batch.rss.domain.model.Blog -import site.techmoa.batch.rss.domain.model.BlogStatus -import site.techmoa.batch.rss.port.ArticlePort -import site.techmoa.batch.rss.port.BlogPort -import site.techmoa.batch.rss.support.RssClient -import site.techmoa.batch.rss.trigger.CollectRssUseCase +import site.techmoa.worker.rss.domain.model.Article +import site.techmoa.worker.rss.domain.model.Blog +import site.techmoa.worker.rss.domain.model.BlogStatus +import site.techmoa.worker.rss.port.ArticlePort +import site.techmoa.worker.rss.port.BlogPort +import site.techmoa.worker.rss.support.RssClient +import site.techmoa.worker.rss.trigger.CollectRssUseCase import java.util.concurrent.CompletableFuture import java.util.concurrent.Executor import java.util.concurrent.atomic.AtomicInteger diff --git a/batch/schedules/build.gradle.kts b/worker/scheduler/build.gradle.kts similarity index 99% rename from batch/schedules/build.gradle.kts rename to worker/scheduler/build.gradle.kts index 3d07337..185d697 100644 --- a/batch/schedules/build.gradle.kts +++ b/worker/scheduler/build.gradle.kts @@ -1,8 +1,7 @@ dependencies { implementation(project(":domain")) - runtimeOnly(project(":infrastructure:mysql")) - implementation("org.springframework.boot:spring-boot-starter-jdbc") + runtimeOnly(project(":infrastructure:mysql")) compileOnly("org.springframework:spring-tx") compileOnly("org.springframework:spring-context") } diff --git a/batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/annotation/EventHandler.kt b/worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/annotation/EventHandler.kt similarity index 77% rename from batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/annotation/EventHandler.kt rename to worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/annotation/EventHandler.kt index f1f18b8..57d0506 100644 --- a/batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/annotation/EventHandler.kt +++ b/worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/annotation/EventHandler.kt @@ -1,4 +1,4 @@ -package site.techmoa.batch.schedules.annotation +package site.techmoa.worker.scheduler.annotation import org.springframework.stereotype.Component diff --git a/batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/config/BatchSchedulesConfig.kt b/worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/config/BatchSchedulesConfig.kt similarity index 95% rename from batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/config/BatchSchedulesConfig.kt rename to worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/config/BatchSchedulesConfig.kt index d332db1..7d0a7e1 100644 --- a/batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/config/BatchSchedulesConfig.kt +++ b/worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/config/BatchSchedulesConfig.kt @@ -1,4 +1,4 @@ -package site.techmoa.batch.schedules.config +package site.techmoa.worker.scheduler.config import org.slf4j.LoggerFactory import org.springframework.context.annotation.Bean diff --git a/batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/dto/LastScannedArticleDto.kt b/worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/dto/LastScannedArticleDto.kt similarity index 60% rename from batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/dto/LastScannedArticleDto.kt rename to worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/dto/LastScannedArticleDto.kt index 664c718..4dc597a 100644 --- a/batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/dto/LastScannedArticleDto.kt +++ b/worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/dto/LastScannedArticleDto.kt @@ -1,4 +1,4 @@ -package site.techmoa.batch.schedules.dto +package site.techmoa.worker.scheduler.dto data class LastScannedArticleDto( val articleId: Long = 0, diff --git a/batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/dto/NewArticleDto.kt b/worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/dto/NewArticleDto.kt similarity index 80% rename from batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/dto/NewArticleDto.kt rename to worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/dto/NewArticleDto.kt index 85254de..c086c1b 100644 --- a/batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/dto/NewArticleDto.kt +++ b/worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/dto/NewArticleDto.kt @@ -1,4 +1,4 @@ -package site.techmoa.batch.schedules.dto +package site.techmoa.worker.scheduler.dto data class NewArticleDto( val articleId: Long, diff --git a/batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/event/NewArticlesEventHandler.kt b/worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/event/NewArticlesEventHandler.kt similarity index 71% rename from batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/event/NewArticlesEventHandler.kt rename to worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/event/NewArticlesEventHandler.kt index 3f910ab..f254d35 100644 --- a/batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/event/NewArticlesEventHandler.kt +++ b/worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/event/NewArticlesEventHandler.kt @@ -1,16 +1,14 @@ -package site.techmoa.batch.schedules.event +package site.techmoa.worker.scheduler.event import org.springframework.transaction.annotation.Propagation import org.springframework.transaction.annotation.Transactional import org.springframework.transaction.event.TransactionPhase import org.springframework.transaction.event.TransactionalEventListener -import site.techmoa.batch.schedules.annotation.EventHandler -import site.techmoa.batch.schedules.repository.OutboxRepository import site.techmoa.domain.event.OutboxMessages -@EventHandler +@site.techmoa.worker.scheduler.annotation.EventHandler class NewArticlesEventHandler( - private val outboxRepository: OutboxRepository + private val outboxRepository: site.techmoa.worker.scheduler.repository.OutboxRepository ) { @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT) diff --git a/batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/repository/ArticleRepository.kt b/worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/repository/ArticleRepository.kt similarity index 83% rename from batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/repository/ArticleRepository.kt rename to worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/repository/ArticleRepository.kt index d720479..fb14081 100644 --- a/batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/repository/ArticleRepository.kt +++ b/worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/repository/ArticleRepository.kt @@ -1,16 +1,16 @@ -package site.techmoa.batch.schedules.repository +package site.techmoa.worker.scheduler.repository import org.springframework.jdbc.core.JdbcTemplate import org.springframework.jdbc.core.RowMapper import org.springframework.stereotype.Repository -import site.techmoa.batch.schedules.dto.NewArticleDto +import site.techmoa.worker.scheduler.dto.NewArticleDto @Repository(value = "SchedulesArticleRepository") class ArticleRepository( private val jdbcTemplate: JdbcTemplate ) { - fun findByIdGreaterThan(articleId: Long): List { + fun findByIdGreaterThan(articleId: Long): List { val sql = """ SELECT a.article_id, a.blog_id, a.title, a.link, a.pub_date, b.name FROM article a diff --git a/batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/repository/LastScannedArticleRepository.kt b/worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/repository/LastScannedArticleRepository.kt similarity index 92% rename from batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/repository/LastScannedArticleRepository.kt rename to worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/repository/LastScannedArticleRepository.kt index 7ed6b69..1eb90a5 100644 --- a/batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/repository/LastScannedArticleRepository.kt +++ b/worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/repository/LastScannedArticleRepository.kt @@ -1,9 +1,10 @@ -package site.techmoa.batch.schedules.repository +package site.techmoa.worker.scheduler.repository import org.springframework.dao.EmptyResultDataAccessException import org.springframework.jdbc.core.JdbcTemplate import org.springframework.stereotype.Repository -import site.techmoa.batch.schedules.dto.LastScannedArticleDto +import site.techmoa.worker.scheduler.dto.LastScannedArticleDto + import java.sql.Timestamp import java.time.Instant diff --git a/batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/repository/OutboxRepository.kt b/worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/repository/OutboxRepository.kt similarity index 96% rename from batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/repository/OutboxRepository.kt rename to worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/repository/OutboxRepository.kt index 55699ca..f95a027 100644 --- a/batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/repository/OutboxRepository.kt +++ b/worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/repository/OutboxRepository.kt @@ -1,4 +1,4 @@ -package site.techmoa.batch.schedules.repository +package site.techmoa.worker.scheduler.repository import org.springframework.jdbc.core.JdbcTemplate import org.springframework.jdbc.core.RowMapper @@ -83,14 +83,14 @@ class OutboxRepository( jdbcTemplate.update( sql, - OutboxStatus.FAILED.name, + OutboxStatus.FAIL.name, errorMessage, outboxId ) } @Transactional - fun claimPending(batchSize: Int): List { + fun claimPending(batchSize: Int = 100): List { val pendingMessages = findPending(batchSize) markPublishing(pendingMessages.map { it.outboxMessageId }) return pendingMessages diff --git a/batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/repository/WebhookRepository.kt b/worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/repository/WebhookRepository.kt similarity index 97% rename from batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/repository/WebhookRepository.kt rename to worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/repository/WebhookRepository.kt index cd8d61b..8bfb39f 100644 --- a/batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/repository/WebhookRepository.kt +++ b/worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/repository/WebhookRepository.kt @@ -1,9 +1,9 @@ -package site.techmoa.batch.schedules.repository +package site.techmoa.worker.scheduler.repository +import org.springframework.dao.EmptyResultDataAccessException import org.springframework.jdbc.core.JdbcTemplate import org.springframework.jdbc.core.RowMapper import org.springframework.stereotype.Repository -import org.springframework.dao.EmptyResultDataAccessException import site.techmoa.domain.model.Webhook import site.techmoa.domain.model.WebhookPlatform import site.techmoa.domain.model.WebhookValidity diff --git a/batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/service/OutboxDispatchService.kt b/worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/service/OutboxDispatchService.kt similarity index 71% rename from batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/service/OutboxDispatchService.kt rename to worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/service/OutboxDispatchService.kt index 8be12e9..5565497 100644 --- a/batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/service/OutboxDispatchService.kt +++ b/worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/service/OutboxDispatchService.kt @@ -1,39 +1,35 @@ -package site.techmoa.batch.schedules.service +package site.techmoa.worker.scheduler.service import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import org.slf4j.LoggerFactory import org.springframework.stereotype.Service -import site.techmoa.batch.schedules.repository.ArticleRepository -import site.techmoa.batch.schedules.repository.OutboxRepository -import site.techmoa.batch.schedules.repository.WebhookRepository import site.techmoa.domain.event.OutboxMessages.NewArticlesOutboxMessage.OutboxPayload -import site.techmoa.domain.event.WebhookGatewayPort +import site.techmoa.domain.event.WebhookMessageBroker +import site.techmoa.worker.scheduler.repository.ArticleRepository +import site.techmoa.worker.scheduler.repository.OutboxRepository +import site.techmoa.worker.scheduler.repository.WebhookRepository @Service class OutboxDispatchService( private val outboxRepository: OutboxRepository, private val articleRepository: ArticleRepository, private val webhookRepository: WebhookRepository, - private val webhookGatewayPort: WebhookGatewayPort, + private val messageBroker: WebhookMessageBroker, private val objectMapper: ObjectMapper ) { private val log = LoggerFactory.getLogger(javaClass) - companion object { - const val BATCH_SIZE = 10 - } - fun dispatchPending() { - val pendingMessages = outboxRepository.claimPending(BATCH_SIZE) + val pendingMessages = outboxRepository.claimPending() if (pendingMessages.isEmpty()) return pendingMessages.forEach { message -> try { val json = message.payload val payload = objectMapper.readValue(json) - webhookGatewayPort.publish(payload) + messageBroker.publish(payload) outboxRepository.markSuccess(message.outboxMessageId) } catch (ex: Exception) { log.error( diff --git a/batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/service/ScanNewArticlesService.kt b/worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/service/ScanNewArticlesService.kt similarity index 82% rename from batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/service/ScanNewArticlesService.kt rename to worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/service/ScanNewArticlesService.kt index c1dff7c..804b3a1 100644 --- a/batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/service/ScanNewArticlesService.kt +++ b/worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/service/ScanNewArticlesService.kt @@ -1,12 +1,12 @@ -package site.techmoa.batch.schedules.service +package site.techmoa.worker.scheduler.service import org.slf4j.LoggerFactory import org.springframework.stereotype.Service import org.springframework.transaction.annotation.Transactional -import site.techmoa.batch.schedules.repository.ArticleRepository -import site.techmoa.batch.schedules.repository.WebhookRepository -import site.techmoa.batch.schedules.usecase.LastScannedArticleUseCase -import site.techmoa.batch.schedules.usecase.NewArticlesEventUseCase +import site.techmoa.worker.scheduler.repository.ArticleRepository +import site.techmoa.worker.scheduler.repository.WebhookRepository +import site.techmoa.worker.scheduler.usecase.LastScannedArticleUseCase +import site.techmoa.worker.scheduler.usecase.NewArticlesEventUseCase @Service class ScanNewArticlesService( diff --git a/batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/trigger/local/LocalPublishMessageTrigger.kt b/worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/trigger/local/LocalPublishMessageTrigger.kt similarity index 72% rename from batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/trigger/local/LocalPublishMessageTrigger.kt rename to worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/trigger/local/LocalPublishMessageTrigger.kt index d1cf3e1..7fd3827 100644 --- a/batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/trigger/local/LocalPublishMessageTrigger.kt +++ b/worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/trigger/local/LocalPublishMessageTrigger.kt @@ -1,10 +1,10 @@ -package site.techmoa.batch.schedules.trigger.local +package site.techmoa.worker.scheduler.trigger.local import org.springframework.context.annotation.Profile import org.springframework.scheduling.annotation.Scheduled import org.springframework.stereotype.Component -import site.techmoa.batch.schedules.service.OutboxDispatchService -import site.techmoa.batch.schedules.trigger.support.OutboxDispatchMessageTriggerSupport +import site.techmoa.worker.scheduler.service.OutboxDispatchService +import site.techmoa.worker.scheduler.trigger.support.OutboxDispatchMessageTriggerSupport @Component @Profile("local") diff --git a/batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/trigger/local/LocalRecordOutboxMessageTrigger.kt b/worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/trigger/local/LocalRecordOutboxMessageTrigger.kt similarity index 71% rename from batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/trigger/local/LocalRecordOutboxMessageTrigger.kt rename to worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/trigger/local/LocalRecordOutboxMessageTrigger.kt index 1a98527..6c5cb5e 100644 --- a/batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/trigger/local/LocalRecordOutboxMessageTrigger.kt +++ b/worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/trigger/local/LocalRecordOutboxMessageTrigger.kt @@ -1,10 +1,10 @@ -package site.techmoa.batch.schedules.trigger.local +package site.techmoa.worker.scheduler.trigger.local import org.springframework.context.annotation.Profile import org.springframework.scheduling.annotation.Scheduled import org.springframework.stereotype.Component -import site.techmoa.batch.schedules.service.ScanNewArticlesService -import site.techmoa.batch.schedules.trigger.support.RecordOutboxMessageTriggerSupport +import site.techmoa.worker.scheduler.service.ScanNewArticlesService +import site.techmoa.worker.scheduler.trigger.support.RecordOutboxMessageTriggerSupport @Component @Profile("local") diff --git a/batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/trigger/prod/ProdPublishMessageTrigger.kt b/worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/trigger/prod/ProdPublishMessageTrigger.kt similarity index 72% rename from batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/trigger/prod/ProdPublishMessageTrigger.kt rename to worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/trigger/prod/ProdPublishMessageTrigger.kt index a7655b3..f558b07 100644 --- a/batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/trigger/prod/ProdPublishMessageTrigger.kt +++ b/worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/trigger/prod/ProdPublishMessageTrigger.kt @@ -1,10 +1,10 @@ -package site.techmoa.batch.schedules.trigger.prod +package site.techmoa.worker.scheduler.trigger.prod import org.springframework.context.annotation.Profile import org.springframework.scheduling.annotation.Scheduled import org.springframework.stereotype.Component -import site.techmoa.batch.schedules.service.OutboxDispatchService -import site.techmoa.batch.schedules.trigger.support.OutboxDispatchMessageTriggerSupport +import site.techmoa.worker.scheduler.service.OutboxDispatchService +import site.techmoa.worker.scheduler.trigger.support.OutboxDispatchMessageTriggerSupport @Component @Profile("prod") diff --git a/batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/trigger/prod/ProdRecordOutboxMessageTrigger.kt b/worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/trigger/prod/ProdRecordOutboxMessageTrigger.kt similarity index 71% rename from batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/trigger/prod/ProdRecordOutboxMessageTrigger.kt rename to worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/trigger/prod/ProdRecordOutboxMessageTrigger.kt index 59ecf36..ca79b98 100644 --- a/batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/trigger/prod/ProdRecordOutboxMessageTrigger.kt +++ b/worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/trigger/prod/ProdRecordOutboxMessageTrigger.kt @@ -1,10 +1,10 @@ -package site.techmoa.batch.schedules.trigger.prod +package site.techmoa.worker.scheduler.trigger.prod import org.springframework.context.annotation.Profile import org.springframework.scheduling.annotation.Scheduled import org.springframework.stereotype.Component -import site.techmoa.batch.schedules.service.ScanNewArticlesService -import site.techmoa.batch.schedules.trigger.support.RecordOutboxMessageTriggerSupport +import site.techmoa.worker.scheduler.service.ScanNewArticlesService +import site.techmoa.worker.scheduler.trigger.support.RecordOutboxMessageTriggerSupport @Component @Profile("prod") diff --git a/batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/trigger/support/OutboxDispatchMessageTriggerSupport.kt b/worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/trigger/support/OutboxDispatchMessageTriggerSupport.kt similarity index 88% rename from batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/trigger/support/OutboxDispatchMessageTriggerSupport.kt rename to worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/trigger/support/OutboxDispatchMessageTriggerSupport.kt index c8c76a9..ab519d5 100644 --- a/batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/trigger/support/OutboxDispatchMessageTriggerSupport.kt +++ b/worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/trigger/support/OutboxDispatchMessageTriggerSupport.kt @@ -1,7 +1,7 @@ -package site.techmoa.batch.schedules.trigger.support +package site.techmoa.worker.scheduler.trigger.support import org.slf4j.LoggerFactory -import site.techmoa.batch.schedules.service.OutboxDispatchService +import site.techmoa.worker.scheduler.service.OutboxDispatchService abstract class OutboxDispatchMessageTriggerSupport( private val outboxDispatchService: OutboxDispatchService diff --git a/batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/trigger/support/RecordOutboxMessageTriggerSupport.kt b/worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/trigger/support/RecordOutboxMessageTriggerSupport.kt similarity index 87% rename from batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/trigger/support/RecordOutboxMessageTriggerSupport.kt rename to worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/trigger/support/RecordOutboxMessageTriggerSupport.kt index 5f8774a..d42d907 100644 --- a/batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/trigger/support/RecordOutboxMessageTriggerSupport.kt +++ b/worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/trigger/support/RecordOutboxMessageTriggerSupport.kt @@ -1,7 +1,7 @@ -package site.techmoa.batch.schedules.trigger.support +package site.techmoa.worker.scheduler.trigger.support import org.slf4j.LoggerFactory -import site.techmoa.batch.schedules.service.ScanNewArticlesService +import site.techmoa.worker.scheduler.service.ScanNewArticlesService abstract class RecordOutboxMessageTriggerSupport( private val service: ScanNewArticlesService diff --git a/batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/usecase/LastScannedArticleUseCase.kt b/worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/usecase/LastScannedArticleUseCase.kt similarity index 73% rename from batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/usecase/LastScannedArticleUseCase.kt rename to worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/usecase/LastScannedArticleUseCase.kt index b6972aa..9e92be7 100644 --- a/batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/usecase/LastScannedArticleUseCase.kt +++ b/worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/usecase/LastScannedArticleUseCase.kt @@ -1,8 +1,8 @@ -package site.techmoa.batch.schedules.usecase +package site.techmoa.worker.scheduler.usecase import org.springframework.stereotype.Component -import site.techmoa.batch.schedules.dto.LastScannedArticleDto -import site.techmoa.batch.schedules.repository.LastScannedArticleRepository +import site.techmoa.worker.scheduler.dto.LastScannedArticleDto +import site.techmoa.worker.scheduler.repository.LastScannedArticleRepository @Component class LastScannedArticleUseCase( diff --git a/batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/usecase/NewArticlesEventUseCase.kt b/worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/usecase/NewArticlesEventUseCase.kt similarity index 94% rename from batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/usecase/NewArticlesEventUseCase.kt rename to worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/usecase/NewArticlesEventUseCase.kt index ec794b5..7c4ceec 100644 --- a/batch/schedules/src/main/kotlin/site/techmoa/batch/schedules/usecase/NewArticlesEventUseCase.kt +++ b/worker/scheduler/src/main/kotlin/site/techmoa/worker/scheduler/usecase/NewArticlesEventUseCase.kt @@ -1,12 +1,12 @@ -package site.techmoa.batch.schedules.usecase +package site.techmoa.worker.scheduler.usecase import com.fasterxml.jackson.databind.ObjectMapper import org.springframework.context.ApplicationEventPublisher import org.springframework.stereotype.Component -import site.techmoa.batch.schedules.dto.NewArticleDto import site.techmoa.domain.event.OutboxMessages import site.techmoa.domain.event.OutboxMessages.NewArticlesOutboxMessage import site.techmoa.domain.model.Webhook +import site.techmoa.worker.scheduler.dto.NewArticleDto import java.time.format.DateTimeFormatter import java.util.*