Redis Stream: 메시지 브로커 구현과 관리
스트림 데이터 저장/조회, 소비자 그룹 운영, ACK와 메시지 재할당, 상태 모니터링
Redis Stream: 메시지 브로커 구현과 관리
1. 레디스 Stream
- Stream은 레디스 5.0에서 새로 추가된 자료 구조로 대용량, 대규모의 메시징 데이터를 빠르게 처리할 수 있도록 설계됐다
- 일반적으로 로그가 파일의 내용을 업데이트하거나 지우지 않고 쌓기만 하는 것처럼 stream 또한 데이터를 계속해서 추가하는 방식으로 저장되는(append-only) 자료 구조다
- stream의 두 가지 활용
- 백엔드 개발자는 stream을 대량의 데이터를 효율적으로 처리하는 플랫폼으로 활용
- 데이터 엔지니어는 stream을 여러 생산자가 생성한 데이터를 다양한 소비자가 처리할 수 있게 지원하는 데이터 저장소 및 중간 큐잉 시스템으로 활용
2. 데이터의 저장
2-1. 메시지의 저장과 식별
- 레디스에서는 하나의 stream 자료 구조가 하나의 stream을 의미한다
- 각 자료 구조가 하나의 키에 연결되는 것과 마찬가지로 stream 또한 하나의 키에 연결된 자료 구조다
- 레디스 stream에서 각 메시지는 시간과 관련된 유니크한 ID를 가진다
1
<millisecondsTime>-<sequenceNumber>
- ID는 위와 같이 밀리세컨드 파트와 시퀀스 파트로 나뉜다
- 밀리세컨드 파트
- 실제 stream에 아이템이 저정될 시점의 레디스 노드 로컬 시간
- 시퀀스 파트
- 동일 밀리세컨드 시간에 여러 아이템이 저장될 수 있으므로, 같은 밀리세컨드에 저장된 데이터의 순서를 의미한다
- 시퀀스 번호는 64bit
- ID 값이 곧 시간을 의미하기 때문에 시간을 이용해 특정 데이터를 검색할 수 있다
2-2. 스트림 생성과 데이터 입력
- 레디스에서는 따로 stream을 생성하는 과정은 필요하지 않으며, XADD 커맨드를 이용해 새로운 이름의 stream에 데이터를 저장하면 데이터의 저장과 동시에 stream 자료 구조가 생성된다
1
XADD Email * subject "first" body "hello?"
- 위 커맨드를 실행하면 Email이라는 이름의 stream이 생성된다
- 만약 기존에 같은 이름의 키가 존재했다면 이 커맨드는 기존 stream에 새로운 메시지를 추가하며, 존재하지 않았을 때는 Email이라는 이름의 키를 가진 새로운 stream 자료 구조를 생성한다
- * 필드는 저장되는 데이터의 ID를 의미하며, 이 값을 *로 입력할 경우 레디스에서 자동 생성되는 타임스탬프 ID를 사용하겠다는 것을 의미
- XADD 커맨드를 사용했을 때 반환되는 값이 바로 저장되는 데이터의 ID
- 메시지는 key-value 쌍으로 저장되며, 위 커맨드에서 subject하는 키에는 first 값이, body 키에는 hello? 값이 저장된다
- 자동으로 생성되는 ID가 아니라 서비스에서 기존에 사용하던 ID를 이용해 메시지를 구분하는 경우에는 ID 필드에 *가 아니라 직접 ID 값을 지정하면 된다
- 직접 지정하는 경우 이후 저장되는 stream의 ID는 이전에 저장됐던 ID 값보다 작은 값으로 지정할 수 없다
3. 데이터 조회
- 레디스 strean에서는 데이터를 두 가지 방식으로 읽을 수 있다
- 첫 번째는 카프카에서처럼 실시간으로 처리되는 데이터를 리스닝
- 두 번째는 ID를 이용해 필요한 데이터를 검색하는 방식
3-1. 실시간 리스닝
- XREAD 커맨드를 이용하면 실시간으로 stream에 저장되는 데이터를 읽어올 수 있다
1
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
- stream에 저장된 데이터를 처음부터 읽어오고, 새로운 메시지가 들어올 때까지 토픽을 계속 리스닝하면서 기다리기
1
XREAD BLOCK 0 STREAMS Email 0
- BLOCK 0은 더 이상 stream에서 가져올 데이터가 없더라도 연결을 끊지 말고 계속 stream을 리스닝하라는 의미
- 만약 커맨드를 실행한 이후의 메시지만을 가져오고 싶다면 0 대신 특수 ID인 $를 입력하면 된다
- 즉 $는 stream에 저장된 최대 ID를 의미
- 직접 ID 값을 지정해서 데이터 읽어오기
1
XREAD BLOCK 0 STREAMS Email 0 1659115180983
3-2. 특정 데이터 조회
- XRANGE 커맨드로 ID를 이용해 원하는 시간대의 데이터를 조회할 수 있다
1
XRANGE key start end [COUNT count]
- XREVRANGE는 XRANGE의 역순으로 데이터를 조회할 때 사용
1
XREVRANGE key end start [COUNT count]
- stream에 저장된 ID 중 가장 작은 ID 값을 지정하고 싶을 때는 -, 제일 마지막 ID 값을 지정하고 싶을 때는 + 기호 사용
1
XRANGE Email - +
4. 소비자와 소비자 그룹
- 같은 데이터를 여러 소비자에게 전달하는 것을 팬아웃
- 여러 소비자가 XREAD 커맨드를 실행하는 방식으로 팬아웃이 수행
- 레디스 stream에서는 데이터가 저장될 때마다 고유한 ID를 부여받아 순서대로 저장하기 때문에 소비자에게 데이터가 전달될 때 순서를 보장
- 카프카의 경우 유니크 키는 파티션 내에서만 보장되기 때문에 소비자가 여러 파티션에서 토픽을 읽어갈 때는 데이터의 순서를 보장할 수 없다
- 카프카에서 메시지 순서가 보장되도록 데이터를 처리하기 위해서는 소비자 그룹을 사용해야 한다
- 레디스 stream에서도 소비자 그룹이라는 개념이 존재하지만, 카프카에서와 다르다
- 레디스 stream은 카프카와는 달리 메시지가 전달되는 순서를 신경쓰지 않아도 된다
- 레디스 stream에서 소비자 그룹 내의 한 소비자는 다른 소비자가 아직 읽지 않은 데이터만을 읽어간다
- 소비자 그룹 생성
1
XGROUP CREATE Email EmailServiceGroup $
- Email stream을 읽어가는 EmailServiceGroup이라는 소비자 그룹 생성
- $는 현재 시점 이후의 데이터부터 리스닝하겠다는 의미
- 소비자 그룹을 이용해 데이터 읽어오기
1
XREADGROUP GROUP EmailServiceGroup emailService1 COUNT 1 STREAMS Email >
- EmailServiceGroup에 속한 emailService1이라는 이름의 소비자가 Email stream에 있는 1개의 메시지를 읽어온느 커맨드
- 매번 소비자가 소비자 그룹을 이용해 작업을 수행할 때마다 그룹 내에서 이 소비자를 고유하게 식별할 수 있는 이름을 지정해야 한다
- 다른 소비자에게 읽히지 않은 데이터가 있다면 데이터를 1개 가져오고, 없다면 nil 값을 반환한다
- COUNT 1
- 카프카와 다르게 레디스에서 각 소비자는 COUNT 커맨드를 이용해 소비할 메시지 개수를 직접 지정할 수 있다
- STREAMS Email >
- Email이라는 이름의 stream에서 다른 소비자에게 전달되지 않았던 새로운 메시지를 전달
- 0 또는 다른 숫자 ID를 입력하는 경우 새로운 메시지를 확인하는 것이 아닌, 입력한 ID보다 큰 ID 중 pending list에 속하는 메시지를 반환
5. ACK와 보류 리스트
- 여러 서비스가 메시지 브로커를 이용해 데이터를 처리할 때, 예상치 못한 장애로 인해 시스템이 종료되는 경우 이를 인지하고 재처리하는 기능이 필요
- 메시지 브로커는 각 소비자에게 어떤 메시지까지 전달됐고, 전달된 메시지의 처리 유무를 인지하고 있어야 한다
- 레디스 stream에서는 소비자 그룹에 속한 소비자가 메시지를 읽어가면 각 소비자별로 읽어간 메시지에 대한 리스트를 새로 생성하며, 마지막으로 읽어간 데이터의 ID로 last_delivered_id 값을 업데이트
- last_delivered_id 값은 해당 소비자 그룹에 마지막으로 전달한 ID가 무엇인지 파악해 동일한 메시지를 중복으로 전달하지 않기 위해 사용
- 레디스 stream은 소비자별로 pending list를 만들고, 어떤 소비자가 어떤 데이터를 읽어갔는지 인지
- 소비자가 stream에게 데이터가 처리됐다는 뜻의 ACK를 보내면 레디스 stream은 해당 소비자의 pending list에서 ACK를 받은 메시지를 삭제한다
- 현재 소비자 그룹에서 보류 중인 리스트 확인
1
XPENDING Email EmailServiceGroup
- 현재 소비자 그룹에서 ACK를 받지 못해 보류 중인 메시지의 개수/각각 보류 중인 메시지 ID의 최솟값, 최댓값/각 소비자별로 보류중인 리스트의 갯수를 반환한다
- 메시지 처리 여부 확인
1
XACK Email EmailServiceGroup 16959114481311-0
- 1이 반환된다면 Email stream의 EmailServiceGroup 그룹에 속한 소비자가 16959114481311-0 ID를 가진 메시지를 처리했다는 의미
- 메시지 보증 전략
- at most once
- 메시지를 최소 한 번 보내는 것
- 소비자는 메시지를 받자마자 실제 처리하기 전에 먼저 ACK를 보낸다
- 속도는 향상되지만 ACK를 보낸 뒤 실제로 처리하기 전에 소비자에게 문제가 생겨 서비스가 다운됐을 때는 데이터를 읽을 수 있다
- 메시지가 일부 손실되더라도 빠른 응답이 필요한 경우 선택
- at least once
- 소비자는 받은 메시지를 모두 처리한 뒤 ACK를 보낸다
- ACK 전송이 지연돼 실제로 메시지가 처리됐지만 ACK를 전송하기 전에 소비자가 종료되는 상황이 발생할 수도 있다
- 멱등성이 보장되는 서비스라면 상관 없지만, 그렇지 않을 경우 문제가 될 수 있다
- exactly once
- 모든 메시지가 무조건 한 번씩 전송되는 것을 보장
- 레디스 stream을 이용하면서 exactly once하게 메시지를 전송하고 싶다면 레디스의 set 등의 추가 자료 구조를 이용해 이미 처리된 메시지인지 아닌지를 확인하는 과정이 필요할 수 있다
- at most once
6. 메시지의 재할당
- 소비자 서버에 장애가 발생해 복구되지 않는다면, 해당 소비자가 처리하던 보류 중인 메시지들은 다른 소비자가 대신 처리해야 한다
- XCLAIM 커맨드를 이용하면 메시지 소유권을 다른 소비자에게 할당할 수 있다
1
XCLAIM Email EmailServiceGroup EmailService3 3600000
- XCLAIM 커맨드를 사용할 때는 최소 대기 시간(min-idle-time)을 지정해야 한다
- 메시지가 보류 상태로 머무른 시간이 최소 대기 시간을 초과한 경우에만 소유권을 변경할 수 있도록 해서 같은 메시지가 2개의 다른 소비자에게 중복으로 할당되는 것을 방지할 수 있다
6-1. 메시지의 자동 재할당
- 보류 중인 메시지를 확인하고 특정 소비자에게 직접 소유권을 재할당하는 직업이 자주 발생한다면 자동 재할당을 사용할 수 있다
- 소비자가 직접 보류했던 메시지 중 하나를 자동으로 가져와서 처리
1
XAUTOCLAIM Email EmailServiceGroup es1 3600000 0-0 count 1
- XAUTOCLAIM 커맨드는 할당 대기 중인 다음 메시지의 ID를 반환하는 방식으로 동작하기 때문에 반복적 호출을 가능하게 한다
- 더 이상 대기 중인 보류 메시지가 없을 경우, 0-0이 반환된다
- XCLAIM 커맨드에서처럼 직접 재할당할 메시지를 입력해주지 않아도 되기 때문에 간단하게 메시지의 재할당이 가능하다
6-2. 메시지의 수동 재할당
- stream 내의 각 메시지는 counter라는 값을 각각 가지고 있다
- XREADGROUP을 이용해 소비자에게 할당하거나 XCLAIM 커맨드를 이용해 재할당할 경우 counter 값은 1씩 증가
- counter가 특정 값에 도달하면 이 메시지를 특수한 다른 stream으로 보내 관리자가 추후에 처리하도록 할 수도 있다. 보통 이런 메시지를 dead letter라고 부른다
7. stream 상태 확인
- XINFO 커맨드를 이용해 어떤 소비자가 활성화됐는지, 보류된 메시지는 어떤 건지, 어떤 소비자 그룹이 메시지를 처리하고 있는지 등의 상태를 확인할 수 있다
- 특정 소비자 그룹에 속한 소비자 정보 조회
1
XINFO consumers Email EmailServiceGroup
- stream에 속한 소비자 그룹 list 조회
1
XINFO groups Email
This post is licensed under CC BY 4.0 by the author.