Post

Redis Stream: 메시지 브로커 구현과 관리

스트림 데이터 저장/조회, 소비자 그룹 운영, ACK와 메시지 재할당, 상태 모니터링

Redis Stream: 메시지 브로커 구현과 관리

1. 레디스 Stream

  • Stream은 레디스 5.0에서 새로 추가된 자료 구조로 대용량, 대규모의 메시징 데이터를 빠르게 처리할 수 있도록 설계됐다
  • 일반적으로 로그가 파일의 내용을 업데이트하거나 지우지 않고 쌓기만 하는 것처럼 stream 또한 데이터를 계속해서 추가하는 방식으로 저장되는(append-only) 자료 구조다
  • stream의 두 가지 활용
    • 백엔드 개발자는 stream을 대량의 데이터를 효율적으로 처리하는 플랫폼으로 활용
    • 데이터 엔지니어는 stream을 여러 생산자가 생성한 데이터를 다양한 소비자가 처리할 수 있게 지원하는 데이터 저장소 및 중간 큐잉 시스템으로 활용



2. 데이터의 저장

2-1. 메시지의 저장과 식별

  • 레디스에서는 하나의 stream 자료 구조가 하나의 stream을 의미한다
  • 각 자료 구조가 하나의 키에 연결되는 것과 마찬가지로 stream 또한 하나의 키에 연결된 자료 구조다
  • 레디스 stream에서 각 메시지는 시간과 관련된 유니크한 ID를 가진다
    1
    
    <millisecondsTime>-<sequenceNumber>
    
    • ID는 위와 같이 밀리세컨드 파트와 시퀀스 파트로 나뉜다
    • 밀리세컨드 파트
      • 실제 stream에 아이템이 저정될 시점의 레디스 노드 로컬 시간
    • 시퀀스 파트
      • 동일 밀리세컨드 시간에 여러 아이템이 저장될 수 있으므로, 같은 밀리세컨드에 저장된 데이터의 순서를 의미한다
      • 시퀀스 번호는 64bit
    • ID 값이 곧 시간을 의미하기 때문에 시간을 이용해 특정 데이터를 검색할 수 있다

2-2. 스트림 생성과 데이터 입력

  • 레디스에서는 따로 stream을 생성하는 과정은 필요하지 않으며, XADD 커맨드를 이용해 새로운 이름의 stream에 데이터를 저장하면 데이터의 저장과 동시에 stream 자료 구조가 생성된다
    1
    
    XADD Email * subject "first" body "hello?"
    
    • 위 커맨드를 실행하면 Email이라는 이름의 stream이 생성된다
    • 만약 기존에 같은 이름의 키가 존재했다면 이 커맨드는 기존 stream에 새로운 메시지를 추가하며, 존재하지 않았을 때는 Email이라는 이름의 키를 가진 새로운 stream 자료 구조를 생성한다
    • * 필드는 저장되는 데이터의 ID를 의미하며, 이 값을 *로 입력할 경우 레디스에서 자동 생성되는 타임스탬프 ID를 사용하겠다는 것을 의미
    • XADD 커맨드를 사용했을 때 반환되는 값이 바로 저장되는 데이터의 ID
    • 메시지는 key-value 쌍으로 저장되며, 위 커맨드에서 subject하는 키에는 first 값이, body 키에는 hello? 값이 저장된다
  • 자동으로 생성되는 ID가 아니라 서비스에서 기존에 사용하던 ID를 이용해 메시지를 구분하는 경우에는 ID 필드에 *가 아니라 직접 ID 값을 지정하면 된다
  • 직접 지정하는 경우 이후 저장되는 stream의 ID는 이전에 저장됐던 ID 값보다 작은 값으로 지정할 수 없다



3. 데이터 조회

  • 레디스 strean에서는 데이터를 두 가지 방식으로 읽을 수 있다
  • 첫 번째는 카프카에서처럼 실시간으로 처리되는 데이터를 리스닝
  • 두 번째는 ID를 이용해 필요한 데이터를 검색하는 방식

3-1. 실시간 리스닝

  • XREAD 커맨드를 이용하면 실시간으로 stream에 저장되는 데이터를 읽어올 수 있다
    1
    
    XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
    
  • stream에 저장된 데이터를 처음부터 읽어오고, 새로운 메시지가 들어올 때까지 토픽을 계속 리스닝하면서 기다리기
    1
    
    XREAD BLOCK 0 STREAMS Email 0
    
    • BLOCK 0은 더 이상 stream에서 가져올 데이터가 없더라도 연결을 끊지 말고 계속 stream을 리스닝하라는 의미
    • 만약 커맨드를 실행한 이후의 메시지만을 가져오고 싶다면 0 대신 특수 ID인 $를 입력하면 된다
    • 즉 $는 stream에 저장된 최대 ID를 의미
  • 직접 ID 값을 지정해서 데이터 읽어오기
    1
    
    XREAD BLOCK 0 STREAMS Email 0 1659115180983
    

3-2. 특정 데이터 조회

  • XRANGE 커맨드로 ID를 이용해 원하는 시간대의 데이터를 조회할 수 있다
    1
    
    XRANGE key start end [COUNT count]
    
  • XREVRANGE는 XRANGE의 역순으로 데이터를 조회할 때 사용
    1
    
    XREVRANGE key end start [COUNT count]
    
  • stream에 저장된 ID 중 가장 작은 ID 값을 지정하고 싶을 때는 -, 제일 마지막 ID 값을 지정하고 싶을 때는 + 기호 사용
    1
    
    XRANGE Email - +
    



4. 소비자와 소비자 그룹

  • 같은 데이터를 여러 소비자에게 전달하는 것을 팬아웃
    • 여러 소비자가 XREAD 커맨드를 실행하는 방식으로 팬아웃이 수행
  • 레디스 stream에서는 데이터가 저장될 때마다 고유한 ID를 부여받아 순서대로 저장하기 때문에 소비자에게 데이터가 전달될 때 순서를 보장
    • 카프카의 경우 유니크 키는 파티션 내에서만 보장되기 때문에 소비자가 여러 파티션에서 토픽을 읽어갈 때는 데이터의 순서를 보장할 수 없다
    • 카프카에서 메시지 순서가 보장되도록 데이터를 처리하기 위해서는 소비자 그룹을 사용해야 한다
  • 레디스 stream에서도 소비자 그룹이라는 개념이 존재하지만, 카프카에서와 다르다
    • 레디스 stream은 카프카와는 달리 메시지가 전달되는 순서를 신경쓰지 않아도 된다
    • 레디스 stream에서 소비자 그룹 내의 한 소비자는 다른 소비자가 아직 읽지 않은 데이터만을 읽어간다
  • 소비자 그룹 생성
    1
    
    XGROUP CREATE Email EmailServiceGroup $
    
    • Email stream을 읽어가는 EmailServiceGroup이라는 소비자 그룹 생성
    • $는 현재 시점 이후의 데이터부터 리스닝하겠다는 의미
  • 소비자 그룹을 이용해 데이터 읽어오기
    1
    
    XREADGROUP GROUP EmailServiceGroup emailService1 COUNT 1 STREAMS Email >
    
    • EmailServiceGroup에 속한 emailService1이라는 이름의 소비자가 Email stream에 있는 1개의 메시지를 읽어온느 커맨드
    • 매번 소비자가 소비자 그룹을 이용해 작업을 수행할 때마다 그룹 내에서 이 소비자를 고유하게 식별할 수 있는 이름을 지정해야 한다
    • 다른 소비자에게 읽히지 않은 데이터가 있다면 데이터를 1개 가져오고, 없다면 nil 값을 반환한다
    • COUNT 1
      • 카프카와 다르게 레디스에서 각 소비자는 COUNT 커맨드를 이용해 소비할 메시지 개수를 직접 지정할 수 있다
    • STREAMS Email >
      • Email이라는 이름의 stream에서 다른 소비자에게 전달되지 않았던 새로운 메시지를 전달
      • 0 또는 다른 숫자 ID를 입력하는 경우 새로운 메시지를 확인하는 것이 아닌, 입력한 ID보다 큰 ID 중 pending list에 속하는 메시지를 반환



5. ACK와 보류 리스트

  • 여러 서비스가 메시지 브로커를 이용해 데이터를 처리할 때, 예상치 못한 장애로 인해 시스템이 종료되는 경우 이를 인지하고 재처리하는 기능이 필요
  • 메시지 브로커는 각 소비자에게 어떤 메시지까지 전달됐고, 전달된 메시지의 처리 유무를 인지하고 있어야 한다
  • 레디스 stream에서는 소비자 그룹에 속한 소비자가 메시지를 읽어가면 각 소비자별로 읽어간 메시지에 대한 리스트를 새로 생성하며, 마지막으로 읽어간 데이터의 ID로 last_delivered_id 값을 업데이트
    • last_delivered_id 값은 해당 소비자 그룹에 마지막으로 전달한 ID가 무엇인지 파악해 동일한 메시지를 중복으로 전달하지 않기 위해 사용
  • 레디스 stream은 소비자별로 pending list를 만들고, 어떤 소비자가 어떤 데이터를 읽어갔는지 인지
    • 소비자가 stream에게 데이터가 처리됐다는 뜻의 ACK를 보내면 레디스 stream은 해당 소비자의 pending list에서 ACK를 받은 메시지를 삭제한다
  • 현재 소비자 그룹에서 보류 중인 리스트 확인
    1
    
    XPENDING Email EmailServiceGroup
    
    • 현재 소비자 그룹에서 ACK를 받지 못해 보류 중인 메시지의 개수/각각 보류 중인 메시지 ID의 최솟값, 최댓값/각 소비자별로 보류중인 리스트의 갯수를 반환한다
  • 메시지 처리 여부 확인
    1
    
    XACK Email EmailServiceGroup 16959114481311-0
    
    • 1이 반환된다면 Email stream의 EmailServiceGroup 그룹에 속한 소비자가 16959114481311-0 ID를 가진 메시지를 처리했다는 의미
  • 메시지 보증 전략
    • at most once
      • 메시지를 최소 한 번 보내는 것
      • 소비자는 메시지를 받자마자 실제 처리하기 전에 먼저 ACK를 보낸다
      • 속도는 향상되지만 ACK를 보낸 뒤 실제로 처리하기 전에 소비자에게 문제가 생겨 서비스가 다운됐을 때는 데이터를 읽을 수 있다
      • 메시지가 일부 손실되더라도 빠른 응답이 필요한 경우 선택
    • at least once
      • 소비자는 받은 메시지를 모두 처리한 뒤 ACK를 보낸다
      • ACK 전송이 지연돼 실제로 메시지가 처리됐지만 ACK를 전송하기 전에 소비자가 종료되는 상황이 발생할 수도 있다
      • 멱등성이 보장되는 서비스라면 상관 없지만, 그렇지 않을 경우 문제가 될 수 있다
    • exactly once
      • 모든 메시지가 무조건 한 번씩 전송되는 것을 보장
      • 레디스 stream을 이용하면서 exactly once하게 메시지를 전송하고 싶다면 레디스의 set 등의 추가 자료 구조를 이용해 이미 처리된 메시지인지 아닌지를 확인하는 과정이 필요할 수 있다



6. 메시지의 재할당

  • 소비자 서버에 장애가 발생해 복구되지 않는다면, 해당 소비자가 처리하던 보류 중인 메시지들은 다른 소비자가 대신 처리해야 한다
  • XCLAIM 커맨드를 이용하면 메시지 소유권을 다른 소비자에게 할당할 수 있다
    1
    
    XCLAIM Email EmailServiceGroup EmailService3 3600000
    
    • XCLAIM 커맨드를 사용할 때는 최소 대기 시간(min-idle-time)을 지정해야 한다
    • 메시지가 보류 상태로 머무른 시간이 최소 대기 시간을 초과한 경우에만 소유권을 변경할 수 있도록 해서 같은 메시지가 2개의 다른 소비자에게 중복으로 할당되는 것을 방지할 수 있다

6-1. 메시지의 자동 재할당

  • 보류 중인 메시지를 확인하고 특정 소비자에게 직접 소유권을 재할당하는 직업이 자주 발생한다면 자동 재할당을 사용할 수 있다
  • 소비자가 직접 보류했던 메시지 중 하나를 자동으로 가져와서 처리
    1
    
    XAUTOCLAIM Email EmailServiceGroup es1 3600000 0-0 count 1
    
    • XAUTOCLAIM 커맨드는 할당 대기 중인 다음 메시지의 ID를 반환하는 방식으로 동작하기 때문에 반복적 호출을 가능하게 한다
    • 더 이상 대기 중인 보류 메시지가 없을 경우, 0-0이 반환된다
    • XCLAIM 커맨드에서처럼 직접 재할당할 메시지를 입력해주지 않아도 되기 때문에 간단하게 메시지의 재할당이 가능하다

6-2. 메시지의 수동 재할당

  • stream 내의 각 메시지는 counter라는 값을 각각 가지고 있다
  • XREADGROUP을 이용해 소비자에게 할당하거나 XCLAIM 커맨드를 이용해 재할당할 경우 counter 값은 1씩 증가
  • counter가 특정 값에 도달하면 이 메시지를 특수한 다른 stream으로 보내 관리자가 추후에 처리하도록 할 수도 있다. 보통 이런 메시지를 dead letter라고 부른다



7. stream 상태 확인

  • XINFO 커맨드를 이용해 어떤 소비자가 활성화됐는지, 보류된 메시지는 어떤 건지, 어떤 소비자 그룹이 메시지를 처리하고 있는지 등의 상태를 확인할 수 있다
  • 특정 소비자 그룹에 속한 소비자 정보 조회
    1
    
    XINFO consumers Email EmailServiceGroup
    
  • stream에 속한 소비자 그룹 list 조회
    1
    
    XINFO groups Email
    
This post is licensed under CC BY 4.0 by the author.