이 프로젝트는 Apache Flink의 오픈소스 CDC(Change Data Capture) 커넥터를 사용하여 MySQL에서 MySQL로 실시간 데이터 동기화를 구현한 데모입니다. 로컬 MySQL에서 Google Cloud MySQL, Amazon RDS 등 다양한 MySQL 호환 데이터베이스로의 동기화도 지원합니다.
프로젝트 정보
- 기반 기술: Apache Flink (오픈소스)
- CDC 커넥터: Flink CDC Connector (오픈소스)
- 라이선스: Apache License 2.0
- GitHub 저장소: Flink CDC
지원되는 대상 데이터베이스
- 로컬 MySQL
- Google Cloud SQL for MySQL
- Amazon RDS for MySQL
- Azure Database for MySQL
- 기타 MySQL 호환 데이터베이스
Google Cloud MySQL 동기화 설정
1. Google Cloud MySQL 연결 설정
JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://[INSTANCE_IP]/[DATABASE_NAME]")
.withDriverName("com.mysql.cj.jdbc.Driver")
.withUsername("[USERNAME]")
.withPassword("[PASSWORD]")
.build()
2. Google Cloud MySQL 접속 정보
- 인스턴스 IP: Google Cloud SQL 인스턴스의 IP 주소
- 데이터베이스: 동기화할 데이터베이스 이름
- 사용자: Cloud SQL 사용자 계정
- 비밀번호: Cloud SQL 사용자 비밀번호
3. Google Cloud MySQL 접근 설정
- Cloud SQL 인스턴스의 공개 IP 활성화
- 방화벽 규칙 설정 (3306 포트 허용)
- SSL 연결 설정 (권장)
4. 성능 고려사항
- 네트워크 대기 시간 최적화
- 배치 크기 조정
- Cloud SQL 인스턴스 크기 선택
- 지역 간 데이터 전송 비용 고려
요구사항
- Docker
- Java 11
- Gradle 8.0
Docker Compose 환경 설명
1. 서비스 구성
services:
mysql-source:
image: mysql:8.0
container_name: mysql-source
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: source_db
ports:
- "3306:3306"
volumes:
- ./init/source:/docker-entrypoint-initdb.d
command: --server-id=1 --log-bin=mysql-bin --binlog-format=ROW
mysql-sink:
image: mysql:8.0
container_name: mysql-sink
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: sink_db
ports:
- "3307:3306"
volumes:
- ./init/sink:/docker-entrypoint-initdb.d
2. 환경 상세 설명
2.1 MySQL Source 서버
- 포트: 3306 (기본 MySQL 포트)
- 데이터베이스: source_db
- 초기화 스크립트:
init/source/init.sql
- CDC 설정:
server-id=1
: MySQL 복제를 위한 고유 식별자log-bin=mysql-bin
: 바이너리 로깅 활성화binlog-format=ROW
: 행 기반 복제 설정
2.2 MySQL Sink 서버
- 포트: 3307 (소스와 구분을 위한 포트)
- 데이터베이스: sink_db
- 초기화 스크립트:
init/sink/init.sql
3. 볼륨 마운트
- 소스 DB 초기화 스크립트:
./init/source:/docker-entrypoint-initdb.d
- 싱크 DB 초기화 스크립트:
./init/sink:/docker-entrypoint-initdb.d
4. 네트워크 구성
- 소스 DB:
localhost:3306
- 싱크 DB:
localhost:3307
- 컨테이너 간 통신: Docker 내부 네트워크 사용
테스트 데이터베이스 구성
1. 소스 데이터베이스 (source_db)
-- init/source/init.sql
CREATE TABLE customers (
id INT PRIMARY KEY,
name VARCHAR(255),
email VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 테스트 데이터
INSERT INTO customers (id, name, email) VALUES
(1, 'John Doe', 'john@example.com'),
(2, 'Jane Smith', 'jane@example.com');
2. 싱크 데이터베이스 (sink_db)
-- init/sink/init.sql
CREATE TABLE customers (
id INT PRIMARY KEY,
name VARCHAR(255),
email VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
3. 테스트 시나리오
3.1 기본 동기화 테스트
- 소스 DB에 데이터 추가:
INSERT INTO source_db.customers (id, name, email) VALUES (3, 'Test User', 'test@example.com');
- 싱크 DB에서 확인:
SELECT * FROM sink_db.customers;
3.2 업데이트 테스트
- 소스 DB에서 데이터 수정:
UPDATE source_db.customers SET email = 'updated@example.com' WHERE id = 1;
- 싱크 DB에서 변경 확인:
SELECT * FROM sink_db.customers WHERE id = 1;
3.3 삭제 테스트
- 소스 DB에서 데이터 삭제:
DELETE FROM source_db.customers WHERE id = 2;
- 싱크 DB에서 삭제 확인:
SELECT * FROM sink_db.customers WHERE id = 2;
4. 테스트 데이터 모니터링
4.1 바이너리 로그 확인
# 소스 DB 컨테이너 내부에서
mysqlbinlog /var/lib/mysql/mysql-bin.000001
4.2 CDC 상태 확인
-- 소스 DB에서
SHOW MASTER STATUS;
SHOW SLAVE STATUS;
시작하기
- Docker 컨테이너 실행:
docker-compose up -d
- 컨테이너 상태 확인:
docker-compose ps
- 로그 확인:
docker-compose logs -f
- 프로젝트 빌드:
./gradlew build
- Flink CDC 작업 실행:
./gradlew run
테스트 실행
단위 테스트를 실행하려면:
./gradlew test
구조
docker-compose.yml
: MySQL 소스와 싱크 인스턴스 설정init/source/init.sql
: 소스 MySQL 초기화 스크립트init/sink/init.sql
: 싱크 MySQL 초기화 스크립트src/main/java/com/example/MysqlCdcJob.java
: Flink CDC 작업 구현src/test/java/com/example/MysqlCdcJobTest.java
: 단위 테스트
문제 해결
- 컨테이너 시작 실패
- 로그 확인:
docker-compose logs
- 포트 충돌 확인
- 볼륨 마운트 권한 확인
- 로그 확인:
- CDC 동기화 문제
- 바이너리 로그 설정 확인
- 네트워크 연결 확인
- 권한 설정 확인
- 성능 이슈
- 메모리 설정 확인
- 네트워크 대역폭 확인
- 배치 사이즈 조정
'나의 IT 기억' 카테고리의 다른 글
🤷♂️ 신입 개발자들이 가장 궁금해하는 질문들 - 현직 개발자의 솔직한 답변 (0) | 2025.05.27 |
---|---|
🚀 MCP Server - 나의 동료에게 설명하는 MCP (7) | 2025.05.27 |
SaaS 개발하면서 **멀티테넌시** 구성이 생각보다 복잡하더라고.. (1) | 2025.05.25 |
동료에게 배운 새로운 기술: NGINX 트래픽 미러링으로 안전한 마이그레이션 하기 (1) | 2025.05.24 |
MySQL에서 PostgreSQL로의 실시간 데이터 동기화 시스템 구축기 (1) | 2025.05.24 |