Study/Real-Time Machine Learning

2. Real-time 기계 학습을 위한 데이터 수집

zeah 2025. 3. 6. 15:34
반응형

이 장에서는 다음을 다룬다:

 

  • 실시간 이벤트 데이터 수집
  • 이벤트 기반 아키텍처를 사용하여 실시간 데이터를 스트리밍하고 저장
  • 실시간 머신러닝을 위한 이벤트 기반 아키텍처의 장점

모든 머신러닝 프로젝트는 데이터에서 패턴을 학습하는 머신러닝 모델을 사용하여 해결할 수 있는 문제로 시작한다. 사용할 수 있는 데이터와 수집할 수 있는 데이터는 프로젝트의 가능성을 정의한다. 실시간 머신러닝도 다르지 않다. 실시간 머신러닝 애플리케이션을 구축하기 시작하려면 사용할 수 있는 실시간 데이터가 무엇인지 이해하고 이를 사용하여 유용한 예측을 생성하는 방법을 알아야 한다.

이전 장에서는 실시간 데이터 인스턴스가 실시간 추론을 생성하고 온라인 모델을 훈련하는 데 어떻게 사용되는지 탐구했다. 이를 달성하기 위해서는 데이터가 먼저 원본에서 추론이 이루어지는 프로세스로 수집되고 전송되어야 한다. 이는 데이터 스트림으로 알려진 지속적인 데이터 전송을 처리하기 위한 전용 데이터 아키텍처가 필요하다. 실제로 데이터 스트림은 메시지 큐 또는 이벤트 스트림으로 구현된다. 메시지 큐와 이벤트 스트림은 2.2절에서 논의한다.

이 장에서는 실시간 데이터 소스에서 데이터 포인트를 수집하는 방법을 배운다. 그런 다음, 이 데이터를 이벤트 스트림에 저장하는 퍼블리셔를 작성하고, 스트림에서 실시간 및 과거 데이터를 읽는 구독자를 작성한다. 퍼블리셔와 구독자는 2.2절에서 논의할 것이다. 또한, 이벤트 기반 아키텍처가 실시간 데이터 스트리밍과 관련된 역압(backpressure) 및 확장성 문제를 해결하는 방법을 배운다. 이 장의 끝에서는 실시간 머신러닝을 위한 데이터 수집 메커니즘을 준비하게 될 것이다.

 

그림 2.1은 이벤트 기반 아키텍처를 사용하여 실시간 데이터 흐름을 설명하는 간단한 워크플로우 예시를 보여준다. 구체적으로, 실시간 데이터 소스에서 데이터를 수집하고 이 데이터를 주제(topic)에 게시하는 발행자(publisher)가 있다. 중간에 메시지 브로커(message broker)가 있어 발행자로부터 데이터를 주제를 구독한 구독자(subscriber)에게 라우팅한다.

Figure 2.1 Simple event-driven workflow


2.1 실시간 이벤트 수집

시스템이 진행 중인 항공편의 도착 시간을 예측하도록 하고 싶다고 가정하자. 이는 여행자들이 픽업 차량을 예약하거나 운전자가 공항에 도착할 최적의 시간을 결정하는 데 도움이 될 수 있다. 이 예측 시스템을 구축하기 위해서는 실시간 데이터 소스에서 데이터를 수집해야 한다. 시스템은 항공편 데이터를 스트리밍하고, 온라인 모델을 훈련하기 위해 진행 중인 날씨 업데이트와 같은 다른 실시간 데이터 소스와 통합할 필요가 있다. 또한, 시스템은 최종 사용자에게 실시간 업데이트를 제공하기 위해 항공편 예측을 스트리밍해야 한다.

이 섹션에서는 데이터 수집 부분에 초점을 맞출 것이다. 가장 간단한 형태로, 애플리케이션은 API로부터 데이터를 스트리밍하고 항공편 위치와 속도와 같은 중요한 이벤트를 캡처해야 한다. 데이터 이벤트를 처음부터 생성하고 이러한 이벤트를 실시간 데이터 스트림에 게시하는 방법을 배울 것이다.

다음 그림은 이 장의 결론에서 개발할 데이터 수집 애플리케이션의 아키텍처 다이어그램을 나타낸다.

Figure 2.2 Real-time flights data ingestion application



2.1.1 데이터 소스 선택

우리의 목표는 진행 중인 항공편의 도착 시간을 예측하는 나우캐스팅(nowcasting) 모델을 구축하는 것이다. 나우캐스팅은 예측(forecasting)과 유사하게 미래를 예측하려는 접근 방식이지만, 나우캐스팅은 최근의 트렌드에 크게 의존하여 가까운 미래의 값을 예측한다. 예를 들어, 허리케인이 어디에 상륙할지 또는 주식이 몇 시간 내에 어떤 방향으로 움직일지를 예측하는 것이다. 우리의 나우캐스팅 모델은 최근 항공편 위치를 고려하여 항공편이 언제 도착할지를 예측하려고 한다.


프로젝트를 위한 실시간 데이터 소스를 선택할 때 다음을 고려해야 한다.

이용 약관: 대부분의 API는 데이터 사용 방법에 대한 약관을 명시한다. 예를 들어, 일부 사용 계약은 데이터를 공개적으로 복사하거나 데이터를 사용하는 애플리케이션을 판매하는 것을 금지한다.

신뢰성: 실시간 데이터 프로젝트의 어려운 점 중 하나는 외부 서비스가 제대로 작동하기 위해 사용 가능해야 한다는 것이다. 데이터 소스를 선택할 때, API의 신뢰성을 평가하기 위해 초기 실사를 수행하는 것이 중요하다. 특히, 잘 유지되고 다른 사람들이 사용하는 데이터 소스를 선택하고자 한다. 이를 위한 특정 지표가 있다. 예를 들어, 신뢰할 수 있는 API는 예제가 포함된 자세한 문서를 가지고 있는 경우가 많다. API를 제공하는 조직이나 당사자가 공식 프로그래밍 클라이언트를 가지고 있고, GitHub 저장소가 잘 유지되고 있는지(예: 지난달에 업데이트됨, 유지 관리자가 문제에 응답함, 수백 개의 별이 있음) 확인하는 것이 좋은 신호다.

요율 제한: 대부분의 API는 스팸 공격을 방지하기 위해 일정한 요율 제한을 부과한다. 데이터 소스를 선택할 때 필요한 데이터의 양과 빈도를 결정하는 것이 필요하다. 또한, 실시간 데이터 소스의 제한을 처리하기 위한 전략을 사용할 것이다. 예를 들어, 이 장에서는 요청 요율 제한에 걸리지 않기 위해 폴링 간격 전략을 활용할 것이다.

데이터 품질: 외부 데이터 소스를 사용할 때는 제공되는 데이터에 제한된다. 예를 들어, "실시간"으로 광고된 데이터가 30초 간격으로만 제공될 수 있다. 사용 사례에 따라 얼마나 자주 새로운 데이터가 필요한지 이해하는 것이 중요하다. 가까운 미래의 주가 변동을 예측하려는 애플리케이션은 초 단위로 업데이트를 받아야 할 수 있지만, 월간 강수량을 예측하는 애플리케이션은 일일 업데이트만 필요할 수 있다. 다음 장에서는 온라인 모델링과 관련하여 실시간 데이터 기능의 타당성을 평가하는 방법을 탐구할 것이다.

데이터 형식: REST API는 비구조화된 JSON 데이터를 반환하는 경우가 많다. 데이터 수집 파이프라인의 목표 중 하나는 이 데이터를 정의된 구조로 신뢰성 있게 역직렬화할 수 있도록 하는 것이다. 외부 API를 사용할 때 잠재적인 문제는 데이터 스키마가 언제든지 변경될 수 있어 다운스트림 애플리케이션에 문제가 발생할 수 있다는 것이다. 이러한 이유로 가능한 한 공식 프로그래밍 언어 API를 사용하고, 필드 검증을 수행하여 호환성 문제를 쉽게 식별하고 수정하는 것이 중요하다.

우리의 나우캐스팅 모델을 구동하기 위해 진행 중인 항공편에 대한 데이터를 수집하고자 한다. 오프라인 학습을 위해서는 역사적인 항공편 데이터셋을 찾는 것으로 시작할 것이다. 반면, 우리의 모델은 예측을 위해 매우 최근의 데이터를 사용해야 한다. 따라서 항공편 데이터를 수집하는 실시간 데이터 소스가 필요하다. 다행히 OpenSky (https://opensky-network.org/) 네트워크는 상업 항공편을 포함한 진행 중인 항공 교통에 대한 실시간 비행 데이터를 제공한다. 작성 시점에서 이는 비영리 연구 목적으로 사용할 수 있다.

많은 API가 "실시간" API로 설명되지만 매우 다르게 설계될 수 있다는 점에 유의해야 한다. 일부는 업데이트를 푸시 기반 형식으로 제공하기 위해 장기 실행, 저지연 웹소켓을 활용하여 진정한 실시간이다. 이는 금융 분야에서 가장 일반적이며, 거래량이 많고 정보를 가능한 빨리 아는 것이 매우 가치가 있기 때문이다. 다른 API는 풀 기반이므로 데이터 업데이트를 위해 클라이언트가 주기적으로 요청해야 한다.


OpenSky는 REST 기반의 API를 제공하여 웹 요청을 할 수 있다. 특히, 진행 중인 항공편에 대한 "state vectors"를 반환하는 GET /states/all 경로에 관심이 있다. 내장된 Python의 urllib 라이브러리를 사용하여 API에 단일 요청을 하고 결과를 읽기 쉬운 텍스트로 출력할 것이다.

 from urllib import request
 
 with request.urlopen(“https://opensky- network.org/ api/states/ all”) as response:
 	print(response.read().decode(“utf-8”))

Listing 2.1 Querying current flight positions from OpenSky


이 코드의 출력은 항공편 위치의 거대한 목록이다.

…
 ["c00734","WJA2199","Canada",1728763865,1728763865,-70.7597,20.2082,10972.8,false,249.58,350.87,0.33,null,11658.6,nul
 l,false,0],
 ["a41b89","DAL799","United States",1728763866,1728763866,-105.9763,40.8415,10668,false,248.95,62.69,0.33,null,11102.34,null,fals
 e,0]


API는 모든 알려진 항공편의 현재 위치를 나타내는 JSON 인코딩 객체를 반환했다. 간단히 하기 위해 특정 지역으로 출력을 필터링하는 추가 매개변수를 API에 제공할 수 있다. 아래 수정된 코드는 위도와 경도 좌표를 사용하여 스위스 상공의 모든 현재 항공편을 쿼리하고, 결과를 더 쉽게 작업할 수 있는 Python 객체로 구문 분석한다.

 import json
 from urllib import request
 
 with request.urlopen(“https://opensky-network.org/api/states/all?lamin=45.8389&lomin=5.9962&lamax=47.8229&lomax=10.5226”) as response:
 	print(json.loads(response.read().decode(“utf-8”)))


아래는 요약된 출력이다.

{'time': 1728765860, 'states': [['4b1804', 'SWR560A ', 'Switzerland', 1728765859, 1728765859, 9.2725, 47.6737, 3970.02, False, 160.76, 254.41, -6.5, None, 4091.94, '1000', False, 0], ['4b1806', 'SWR2YA', 'Switzerland', 1728765773, 1728765837, 8.5633, 47.4417, 403.86, True, 0, 64.69, None, None, None, '2000', False, 0]]}

이제 관심 있는 데이터를 캡처하는 반복 가능한 코드를 갖게 되었으므로, 이 데이터를 실시간 기계 학습을 위한 개별 이벤트로 처리할 것이다.

2.1.2 실시간 이벤트 생성

이벤트는 무언가가 발생했다는 기록이다. 현재 예측 시스템의 맥락에서 이벤트는 비행기가 공항에 도착하거나 출발하는 것, 특정 지역의 날씨 업데이트와 같은 세계에서 발생한 특정한 일을 나타낼 수 있다. 그러나 이벤트는 애플리케이션 내부에서 발생하는 일을 나타낼 수도 있다. 예를 들어, 새로운 예측이 생성되면 시스템의 다른 부분에 전달되어야 한다. 이벤트 기반 시스템은 개별 이벤트의 데이터와 제어 신호를 전달함으로써 작동한다.

데이터 수집의 다음 단계는 쿼리된 비행 위치를 개별 이벤트로 변환하는 것이다. 최소한 각 이벤트는 데이터 페이로드와 이벤트가 발생한 시간을 나타내는 타임스탬프를 포함해야 한다. 시간에 민감한 시스템에서는 이벤트가 수신된 시점과 온라인 모델에 의해 최종적으로 처리되는 시점 사이에 지연이 발생하기 때문에 타임스탬프가 중요하다. 이벤트를 생성하는 것은 시스템의 다른 부분에서 해석할 수 있는 데이터 구조를 정의하는 것을 의미한다. 시작하기 위해 위치(위도와 경도), 속도, 방향(true_track), 출발 국가, 항공기의 고유 식별자(icao24)를 캡처하고자 할 수 있다. 아래 함수는 API 응답을 파싱하여 응답의 각 비행에 대한 업데이트 객체 목록을 반환한다. 응답이 시간 순서대로 보장되지 않기 때문에 time_position 타임스탬프에 따라 이벤트를 수동으로 정렬할 것이다.

이제 단일 API 응답을 일련의 이벤트로 파싱하는 함수를 갖게 되었다. 이 함수를 반복적으로 호출하여 현재 예측 시스템을 위한 실시간 데이터를 생성할 필요가 있다. 이벤트 기반 시스템 용어로 이것을 퍼블리셔(publisher)라고 한다. 퍼블리셔는 실시간 시스템의 다른 부분에 이벤트를 전달하는 장기 실행 프로세스이다. 우리의 경우, OpenSky API를 정기적으로 쿼리하고 시스템이 이해할 수 있는 이벤트를 생성하는 퍼블리셔가 필요하다.

Listing 2.3 비행 데이터로부터 이벤트 생성

#A API 응답의 모든 비행 상태를 반복한다.
#B 가장 이른 것부터 가장 늦은 것까지 정렬된 비행 이벤트 목록을 반환한다.

```python
def response_to_events(api_response):
flight_events = []
for update in api_response["states"]: #A
flight_events.append(
{
"icao24": update[0],
"origin_country": update[2],
"time_position": update[3],
"longitude": update[5],
"latitude": update[6],
"velocity": update[9],
"true_track": update[10]
}
)
return sorted(flight_events, key=lambda x: x["time_position"]) #B
```

Real-Time Machine Learning_MEAP_v01_p30

Page 1


--- Page 1 ---
데이터 API의 제약으로 인해 약간의 제한이 있다. HTTP/1.1은 풀 메커니즘이므로 API는 새로운 업데이트가 있을 때 알려주지 않는다. 유일한 방법은 주기적으로 서버에 폴링하여 항공편 업데이트를 가져오는 것이다. 퍼블리셔를 구현할 때 서버를 폴링할 간격을 결정해야 한다. 이 간격이 길수록 시스템은 실시간성이 떨어지게 되며, 오래된 데이터를 기반으로 운영하게 된다. 그러나 간격이 짧으면 API 유지 관리자가 부과한 속도 제한에 걸릴 위험이 있다. 아래 그림은 서로 다른 폴링 간격을 선택했을 때 얻을 수 있는 다양한 결과를 보여준다.

짧은 간격은 더 많은 데이터 포인트와 작은 변화를 생성하고, 긴 간격은 더 적은 데이터 포인트와 더 큰 변화를 생성한다.

Real-Time Machine Learning_MEAP_v01_p31

Page 1


--- Page 1 ---
이상적으로는 가능한 한 빨리 폴링하여 가장 실시간 데이터를 가져오는 것이 좋다. 그러나 실제로는 지속적으로 폴링하는 것이 컴퓨팅 자원을 낭비하고, 빠르게 속도 제한에 부딪히기 때문에 타협해야 한다. 특정 사용 사례에 적합한 폴링 간격을 어떻게 결정할 수 있을까? 첫 번째로 살펴볼 것은 API의 문서이다. 문서에는 기본 데이터가 얼마나 자주 업데이트되는지와 하루/시간/분당 허용되는 요청 수가 명시되어 있어야 한다. 이는 적절한 간격을 결정하는 데 좋은 출발점을 제공한다. 예를 들어, 작성 시점에 OpenSky 문서에 따르면 인증되지 않은 사용자는 10초마다 새로운 데이터를 가져올 수 있으며, 500x500 km 지역 내에서 24시간 동안 400개의 /api/states/all 요청이 허용된다. 따라서 최신 데이터를 얻기 위해 시스템을 10초마다 폴링할 수 있지만, 시스템을 하루 종일 실행하려면 폴링 간격이 24 * 60 / 400 = 3.6분 이상이어서는 안 된다. 일반적으로 30초에서 5분 사이가 좋은 출발점이다. 그러나 이는 데이터의 중요성과 데이터가 얼마나 자주 변경될 가능성이 있는지에 따라 다르다. 경보 시스템 및 금융 시장과 같은 시간에 민감한 사용 사례의 경우 폴링 간격은 1분 미만이어야 한다. 서버 상태 모니터링이나 날씨 추적과 같이 더 천천히 변하거나 안정적인 시스템의 경우 적절한 간격은 5분에 가깝다.

10초 간격으로 시작하여 폴링이 실제로 어떻게 작동하는지 확인할 것이다. 아래 코드는 정의된 간격으로 API를 쿼리하고 응답 데이터를 이벤트의 반복 가능한 형태로 변환하는 이벤트 생성기이다.

이 생성기는 Python의 for 루프에서 반복하여 이벤트를 생성할 수 있다.

이 코드를 실행한 출력은 다음과 같다.

Listing 2.4 비행 업데이트 이벤트 생성기
#A 세 번의 반복 후 실행을 중지한다.
#B yield 키워드는 이 함수를 루프의 매 반복마다 이벤트를 반환하는 생성기로 만든다.
#C API를 스팸하지 않기 위해 시간 간격 동안 대기한다.
import time
from urllib import request
def get_events(url="https://opensky-network.org/api/states/all?lamin=45.8389&lomin=5.9962&lamax=47.8229&lomax=10.5226", interval_sec=10):
for _ in range(3): #A
with request.urlopen(url) as response:
yield from response_to_events(json.loads(response.read().decode("utf-8"))) #B
time.sleep(interval_sec) #C

Listing 2.5 이벤트 생성기 호출
for event in get_events():
print(event)

Real-Time Machine Learning_MEAP_v01_p32

Page 1


--- Page 1 ---
{'icao24': '4d21ef', 'origin_country': 'Malta', 'time_position': 1729024220, 'longitude': 8.0245, 'latitude': 46.7367, 'velocity': 228.51, 'true_track': 144.81}

{'icao24': '3986e1', 'origin_country': 'France', 'time_position': 1729024220, 'longitude': 6.5155, 'latitude': 47.0584, 'velocity': 209.11, 'true_track': 210.29}

{'icao24': '4d223b', 'origin_country': 'Malta', 'time_position': 1729024220, 'longitude': 7.23, 'latitude': 47.5584, 'velocity': 219.62, 'true_track': 145.3}

출력은 연속적이지 않고 이벤트 기반이다. 트리거 이벤트는 시간 기반이며, 매 분마다 API가 호출될 때 새로운 이벤트 배치가 생성된다. 세 번의 배치 후 생성기가 반환되고 프로세스가 종료된다. 실제 프로젝트에서는 생성기가 무한히 쿼리할 것이다. 이는 프로세스가 결국 API 속도 제한에 도달할 수 있음을 의미한다. 장시간 실행되는 시스템은 이러한 오류를 감지하고 백오프(backoff) 전략을 구현해야 한다. 백오프 전략은 이와 같은 자동 쿼리 시스템이 지속적인 오류 루프에 빠지지 않도록 돕는다. 백오프 전략을 구현하는 방법은 4장에서 완전한 비행 예측 시스템을 구축할 때 논의할 것이다.

마지막으로 데이터를 저장할 필요가 있다. 오프라인 모델링을 위해 일반적으로 데이터 인스턴스를 데이터베이스와 같은 쿼리 가능한 저장 형식으로 유지한다. 현재는 각 줄이 JSON 형식으로 인코딩된 단일 비행 이벤트를 나타내는 JSON 라인 파일에 이벤트를 덤프하여 이를 시뮬레이션할 것이다. 이 시점에서 모든 기능을 FlightPublisher 클래스에 공식화할 것이다. 이는 구성된 객체로 퍼블리셔를 쉽게 시작할 수 있게 한다.

초기 비행 데이터 퍼블리셔
```python
class FlightPublisherV1:
def __init__(self, url, interval_sec, file_path):
self.url = url
self.interval_sec = interval_sec
self.file_path = file_path
def response_to_events(self, api_response): #A
flight_events = [] #A
for update in api_response["states"]: #A
flight_events.append( #A
{
"icao24": update[0], #A
"origin_country": update[2], #A
"time_position": update[3], #A
"longitude": update[5], #A
"latitude": update[6], #A
"velocity": update[9], #A
"true_track": update[10], #A
} #A
) #A
return sorted(flight_events, key=lambda x: x["time_position"]) #A
```

Real-Time Machine Learning_MEAP_v01_p33

Page 1


--- Page 1 ---
다음 코드를 실행하면 주기적으로 비행 업데이트 이벤트 배치를 flight_updates.jsonl에 덤프한다.

현재 실시간 데이터 소스에서 데이터를 추출하고, 간단한 데이터 필터링을 수행하며, 데이터 이벤트의 순서가 있는 로그를 유지하는 퍼블리셔가 있다. 이는 오프라인 학습에서 일반적인 추출-변환-적재(ETL) 파이프라인을 반영한다. 그러나 온라인 학습에서는 이 이벤트 로그를 가능한 한 실시간에 가깝게 처리하는 데 관심이 있다. 온라인 모델은 이 데이터를 구문 분석하고 실시간 출력으로 추론을 생성해야 한다. 다음 섹션에서는 실시간으로 수집된 비행 데이터를 온라인 모델링을 위해 처리하는 스트리밍 ETL 파이프라인을 생성할 것이다.

2.2 실시간 데이터 처리

현재 비행 데이터는 두 가지 상태로 존재한다. FlightPublisher가 API에서 업데이트를 읽고 처리하면서 진정한 실시간 형식으로 존재한다. 그런 다음, 업데이트를 디스크에 이벤트 로그로 유지하여 데이터가 역사적인 형식이 된다. 역사적인 형식은 디버깅에 유용하고 데이터를 이해하는 데 도움이 되지만, 온라인 모델링에는 그다지 유용하지 않다.

온라인 모델을 훈련하고 활용하기 위해서는 별도의 프로세스 간에 실시간 데이터를 통신할 수 있어야 한다. 이 섹션에서는 이벤트 기반 아키텍처의 기본 구성 요소를 배우고 온라인 모델링을 준비하기 위해 실시간 수집 구현을 완료할 것이다.

#A API 응답에서 개별 비행 업데이트를 추출한다.
#B API를 폴링하고 비행 업데이트 이벤트를 생성하는 제너레이터이다.
#C 퍼블리셔를 계속 실행하고 이벤트를 파일에 추가한다.
def get_events(self): #B
while True: #B
with request.urlopen(self.url) as response: #B
yield from self.response_to_events( #B
json.loads(response.read().decode("utf-8")) #B
) #B
time.sleep(self.interval_sec) #B
def run(self): #C
for event in self.get_events(): #C
with open(self.file_path, "a") as file: #C
file.write(json.dumps(event) + "\n") #C

Listing 2.7 비행 데이터 퍼블리셔 호출
publisher = FlightPublisherV1(url="https://opensky-​network.org/​api/states/​all?lamin=45.​
8389&lomin=5.​9962&lamax=47.​8229&lomax=10.​5226", interval_sec=60,
file_path="flight_updates.jsonl")
publisher.run()

Real-Time Machine Learning_MEAP_v01_p34

Page 1


--- Page 1 ---
2.2.1 메시지 큐

현재 FlightPublisher는 읽을 수 있는 파일에 순서대로 이벤트를 출력하고 있다. 이것은 사실 메시지 큐의 기본적인 예이다. 메시지 큐는 서로 다른 프로세스가 비동기적으로 통신할 수 있도록 하는 버퍼이다. 메시지 큐에 쓰는 프로세스를 퍼블리셔(publisher) 또는 프로듀서(producer)라고 하고, 메시지 큐를 읽는 프로세스를 구독자(subscriber) 또는 소비자(consumer)라고 한다. 메시지 큐의 장점은 퍼블리셔와 구독자가 독립적으로 작동할 수 있다는 것이다. 예를 들어, 머신 러닝에 관한 잡지를 구매하고 싶다고 가정해 보자. 서점에 가서 이번 달의 ML Digest를 직접 구매할 수 있다. 이는 양측이 조율하여 거래를 완료해야 한다. 이는 OpenSky API에 요청을 보내고 단일 응답을 받는 것과 유사하다. 다른 방법은 잡지 구독을 구매하는 것이다. 이 경우, 잡지 배급자는 매달 발행물을 우편함에 넣는 것만 책임진다. 이는 FlightPublisher가 파일 시스템에 이벤트를 기록하는 것과 유사하다.

메시지 큐를 사용하여 비행 예측 시스템의 다양한 프로세스를 분리하고자 한다. 예를 들어, 이는 FlightPublisher가 구독자가 데이터를 처리하기를 기다리지 않고 새로운 비행 데이터를 계속 수집할 수 있도록 한다. 그러나 파일 버퍼 접근 방식에는 몇 가지 문제가 있다. 하나의 문제는 파일이 전통적으로 처음부터 끝까지 순차적으로 읽힌다는 것이다. 더 많은 이벤트가 게시될수록 파일의 크기가 증가하여 파일을 읽는 데 시간이 더 오래 걸린다. 또한, 가장 최근의 이벤트는 항상 끝에 위치한다. 독자가 새로운 이벤트를 수신하려면 파일의 최근 줄로 이동해야 한다. 이는 구독자가 마지막으로 읽은 위치를 추적해야 하며, 그렇지 않으면 중복된 이벤트를 수신하게 된다. 이러한 동작은 아래 그림에 설명되어 있다.

Real-Time Machine Learning_MEAP_v01_p35

Page 1


--- Page 1 ---
그림 2.5 구독자가 이벤트 로그를 처음 읽을 때 3개의 이벤트를 읽는다. 두 번째 읽기에서는 새로운 이벤트를 읽기 전에 3개의 이벤트를 앞으로 건너뛰어야 한다. 메시지 큐가 없으면 구독자는 중복 이벤트를 받지 않기 위해 마지막으로 읽은 오프셋을 추적해야 한다.

이제 파일 버퍼 접근 방식을 실제 메시지 큐를 사용하도록 업그레이드할 것이다. Apache Kafka, Google PubSub, RabbitMQ와 같은 많은 메시지 큐 솔루션이 있다. RabbitMQ를 사용할 것이다. 왜냐하면 최소한의 코드로 로컬에 큐를 설정할 수 있기 때문이다. 메시지 브로커로서 RabbitMQ는 발행자와 구독자로부터의 연결 요청을 수락하고, 큐 생성 처리, 발행자로부터 보낸 메시지를 올바른 큐로 전달하는 것뿐만 아니라, 발행자가 연결을 닫은 후에도 큐에 정보를 지속시키는 역할을 한다. 이는 발행자와 구독자가 비동기적으로 통신할 수 있게 해주기 때문에 중요하다. 이러한 분리는 시스템이 학습하고 추론을 생성하는 동안 데이터 수집이 계속될 수 있기 때문에 실시간 머신 러닝에 유용하다.

첫 번째 단계는 RabbitMQ 관리 서비스를 실행하는 것이다. 서비스를 시작하는 가장 쉬운 방법은 Docker 이미지를 가져와 실행하는 것이다. Docker가 없다면, 여기(https://docs.docker.com/get-started/get-docker/)의 지침을 따라 시스템에 설치할 수 있다. 다음 명령어를 사용하여 RabbitMQ 서비스를 시작한다.

```bash
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0-management
```

서비스가 성공적으로 시작되었다면 시작 확인 메시지를 볼 수 있을 것이다.

```
2024-10-18 02:52:36.304715+00:00 [info] <0.651.0> Server startup complete; 4 plugins started.
2024-10-18 02:52:36.304715+00:00 [info] <0.651.0> * rabbitmq_prometheus
2024-10-18 02:52:36.304715+00:00 [info] <0.651.0> * rabbitmq_management
2024-10-18 02:52:36.304715+00:00 [info] <0.651.0> * rabbitmq_management_agent
2024-10-18 02:52:36.304715+00:00 [info] <0.651.0> * rabbitmq_web_dispatch
2024-10-18 02:52:36.409833+00:00 [info] <0.9.0> Time to start RabbitMQ: 12912 ms
```

서버와 상호작용하려면 RabbitMQ가 지원하는 메시징 프로토콜을 사용하는 클라이언트가 필요하다. 다행히도 Pika는 순수 Python으로 Advanced Message Queuing Protocol (AMQP)을 구현하는 패키지이다. AMQP는 네트워크 프로세스 간의 메시지 기반 통신을 정의하는 개방형 표준 애플리케이션 계층 프로토콜이다. 우리의 경우, 이는 Python 코드에서 Erlang으로 작성된 RabbitMQ 서버로 메시지를 보낼 수 있게 해준다. 별도의 터미널 창에서 다음 명령어를 실행하여 Pika를 설치한다.

다음으로, 큐에 메시지를 보내 작동하는지 확인할 수 있다. Pika를 사용하여 메시지를 보내기 전에 큐를 선언해야 한다. 다음 코드를 실행하면 로컬 메시지 서비스에 연결을 설정하고, flight_updates 큐에 단일 메시지를 발행하고, 연결을 닫는다.

Real-Time Machine Learning_MEAP_v01_p36

Page 1


--- Page 1 ---
다음은 코드를 실행한 결과이다.

보낸 메시지: {'icao24': 'abc123', 'origin_country': 'United States'}

Pika를 사용하여 큐에서 메시지를 소비하거나 구독할 수도 있다. 구독자 코드는 메시지로 무언가를 수행해야 하기 때문에 조금 더 복잡하다. 현재는 메시지를 출력하지만, 궁극적으로는 수신 데이터를 사용하여 온라인 추론을 수행할 것이다. 여기서는 구독자가 새 메시지를 받을 때마다 호출되는 간단한 콜백 함수(callback function)를 구성할 것이다.

데이터를 큐에 게시하기 (Listing 2.8)
#A 로컬에서 실행 중인 메시지 브로커에 연결한다.
#B 메시지 큐가 존재하는지 확인한다.
#C JSON 페이로드(payload)를 메시지 큐에 게시한다.
```python
import json
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) #A
channel = connection.channel() #A
channel.queue_declare(queue="flight_updates") #B
data = {"icao24": "abc123", "origin_country": "United States"}
channel.basic_publish(exchange="", routing_key="flight_updates", body=json.dumps(data)) #C
print("Sent message:", data)
connection.close()
```

큐에서 데이터를 소비하기 (Listing 2.9)
#A 큐에서 새 메시지를 받을 때 호출되는 함수 콜백(callback)이다.
#B 메시지 큐가 존재하는지 확인한다.
#C 소비자가 자동으로 확인 메시지를 보내도록 구성한다 (fire-and-forget 방식).
#D 큐에서 이벤트 소비를 시작한다.
```python
import json
import pika
def handle_message(channel, method, properties, body): #A
print(f"Received message: {body}") #A
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
channel.queue_declare(queue="flight_updates") #B
channel.basic_consume( #C
queue="flight_updates", on_message_callback=handle_message, auto_ack=True #C
) #C
channel.start_consuming() #D
```

© Manning Publications Co. To comment go to liveBook

Real-Time Machine Learning_MEAP_v01_p37

Page 1


--- Page 1 ---
큐를 다시 선언해야 하는 이유가 궁금할 수 있다. 이미 퍼블리셔 측에서 선언했기 때문이다. 그 이유는 프로덕션 시스템에서 퍼블리셔와 구독자가 서로 다른 기계에서 작동할 가능성이 높기 때문이다. 보조 통신 채널이 없으면 어느 쪽도 상대방이 시작했는지 알 수 없다. 다행히 queue_declare 호출은 멱등성(idempotent)을 가지며, 여러 번 호출해도 하나의 큐만 생성된다. 따라서 존재 여부를 확인하기보다는 실수로 두 번 호출하는 것이 더 쉽다. 퍼블리시 코드를 이전에 실행했다면, 이 코드는 다음 메시지를 출력해야 한다.

수신된 메시지: b'{"icao24": "abc123", "origin_country": "United States"}'

이전과 동일하므로 퍼블리셔에서 구독자로 메시지를 큐를 통해 성공적으로 전송했다. 구독자 프로세스가 대기 상태에 있는 것을 주목해야 한다. 이는 버그처럼 보일 수 있지만 실제로 의도된 동작이다. start_consuming 함수는 새로운 메시지를 기다리는 루프를 구현한다. 이는 이벤트 기반 시스템의 일반적인 패턴이다. 구독자는 시스템에서 무언가가 발생하기를 기다린 후 새로운 데이터를 생성하거나 내부 상태를 업데이트하는 등의 작업을 수행한다. 퍼블리셔 코드를 다시 실행하면 구독자가 또 다른 메시지를 출력하는 것을 확인할 수 있다.

이제 큐에서 읽는 구성 가능한 비행 구독자를 만들 수 있다. subscribe.py라는 새 파일을 만들고 다음 코드를 포함한다.

Real-Time Machine Learning_MEAP_v01_p38

Page 1


--- Page 1 ---
구독자는 표준 파이썬 프로그램처럼 실행할 수 있다.

```python
python subscriber.py
```

발행자가 실행되지 않았기 때문에 아직 출력이 보이지 않을 것이다. 다음은 큐(queue)를 사용하는 업데이트된 FlightPublisher 버전으로, publisher.py로 저장할 수 있다.

Listing 2.10 메시지 큐를 사용하는 항공편 구독자

```python
#A 항공편 업데이트를 수신할 때 호출되는 콜백 함수.
#B 메시지 큐에서 이벤트를 지속적으로 수신하는 블로킹 함수.
#C 구독자를 실행한다.
import pika

class FlightSubscriberV1:
def __init__(self, queue_name):
self.queue_name = queue_name

def process_message(self, channel, method, properties, body): #A
print(f"Received flight update: {body}") #A

def run(self): #B
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) #B
channel = connection.channel() #B
channel.queue_declare(queue=self.queue_name) #B
channel.basic_consume( #B
queue=self.queue_name, #B
on_message_callback=self.process_message, #B
auto_ack=True, #B
) #B
channel.start_consuming() #B

subscriber = FlightSubscriberV1(queue_name="flight_updates")
subscriber.run() #C
```

Listing 2.11 메시지 큐를 사용하는 항공편 발행자

```python
import json
import pika
import time
from urllib import request

class FlightPublisherV2:
def __init__(self, url, interval_sec, queue_name):
self.url = url
self.interval_sec = interval_sec
self.queue_name = queue_name

def response_to_events(self, api_response): #A
```

© Manning Publications Co. To comment go to liveBook

Real-Time Machine Learning_MEAP_v01_p39

Page 1


--- Page 1 ---
이제 두 번째 터미널 창에서 퍼블리셔를 실행한다.

API 응답을 항공편 업데이트 이벤트로 변환한다.
API를 폴링하고 항공편 업데이트 이벤트를 생성하는 제너레이터이다.
퍼블리셔를 무기한 실행하고 메시지 큐에 이벤트를 게시하는 함수이다.
시연을 위한 작은 지연이다.
퍼블리셔를 실행하는 블로킹 호출이다.

```python
flight_events = [] # API 응답을 항공편 업데이트 이벤트로 변환한다.
for update in api_response["states"]: # API 응답을 항공편 업데이트 이벤트로 변환한다.
flight_events.append( # API 응답을 항공편 업데이트 이벤트로 변환한다.
{ # API 응답을 항공편 업데이트 이벤트로 변환한다.
"icao24": update[0], # API 응답을 항공편 업데이트 이벤트로 변환한다.
"origin_country": update[2], # API 응답을 항공편 업데이트 이벤트로 변환한다.
"time_position": update[3], # API 응답을 항공편 업데이트 이벤트로 변환한다.
"longitude": update[5], # API 응답을 항공편 업데이트 이벤트로 변환한다.
"latitude": update[6], # API 응답을 항공편 업데이트 이벤트로 변환한다.
"velocity": update[9], # API 응답을 항공편 업데이트 이벤트로 변환한다.
"true_track": update[10], # API 응답을 항공편 업데이트 이벤트로 변환한다.
} # API 응답을 항공편 업데이트 이벤트로 변환한다.
) # API 응답을 항공편 업데이트 이벤트로 변환한다.
return sorted(flight_events, key=lambda x: x["time_position"]) # API 응답을 항공편 업데이트 이벤트로 변환한다.

def get_events(self): # API를 폴링하고 항공편 업데이트 이벤트를 생성하는 제너레이터이다.
while True: # API를 폴링하고 항공편 업데이트 이벤트를 생성하는 제너레이터이다.
with request.urlopen(self.url) as response: # API를 폴링하고 항공편 업데이트 이벤트를 생성하는 제너레이터이다.
yield from self.response_to_events( # API를 폴링하고 항공편 업데이트 이벤트를 생성하는 제너레이터이다.
json.loads(response.read().decode("utf-8")) # API를 폴링하고 항공편 업데이트 이벤트를 생성하는 제너레이터이다.
) # API를 폴링하고 항공편 업데이트 이벤트를 생성하는 제너레이터이다.
time.sleep(self.interval_sec) # API를 폴링하고 항공편 업데이트 이벤트를 생성하는 제너레이터이다.

def run(self): # 퍼블리셔를 무기한 실행하고 메시지 큐에 이벤트를 게시하는 함수이다.
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
channel.queue_declare(queue=self.queue_name)
for event in self.get_events():
channel.basic_publish(
exchange="", routing_key=self.queue_name, body=json.dumps(event)
)
time.sleep(0.1) # 시연을 위한 작은 지연이다.
connection.close()

publisher = FlightPublisherV2(url="https://opensky-​network.org/​api/states/​all?lamin=45.​
8389&lomin=5.​9962&lamax=47.​8229&lomax=10.​5226", interval_sec=60,
queue_name="flight_updates")
publisher.run() # 퍼블리셔를 실행하는 블로킹 호출이다.
```

© Manning Publications Co. To comment go to liveBook

Real-Time Machine Learning_MEAP_v01_p40

Page 1


--- Page 1 ---
파이썬 publisher.py

출판자와 구독자가 동일한 순서로 동일한 이벤트를 출력하는 것을 볼 수 있다. 이제 실시간 데이터를 수집하고 이를 하류 구독자에게 스트리밍하는 기본 이벤트 기반 파이프라인을 만들었다. 이는 이전에 구현한 파일 버퍼 메커니즘보다 개선된 점이다. 구독자가 새로운 데이터가 언제 사용 가능한지 추측할 필요가 없기 때문이다. 대신 구독자는 진정한 이벤트 기반으로 새로운 비행 이벤트에 즉시 반응할 수 있다.

2.2.2 메시지 큐의 문제점
메시지 큐는 실시간 머신 러닝을 위한 비동기 데이터 전송을 구현하는 간단한 방법이다. 그러나 출판자를 중지하고 구독자를 다시 시작한 후 시도해보면 어떤 일이 발생할지 생각해보라.

이 경우 다시 시작된 구독자는 실행 중이지만 이전 메시지를 받지 못한다. 구독자가 모든 이전 메시지를 출력할 것으로 예상했을 수 있다. 사실, 이전 메시지는 큐에서 제거되어 더 이상 검색할 수 없다. 이는 메시지 큐의 주요 목표가 서비스 간 비동기 통신을 가능하게 하는 것이지 데이터를 지속하는 것이 아니기 때문이다. 엄격한 실시간 사용 사례에서는 이것이 바람직하지만, 실시간 머신 러닝에서는 역사적 데이터를 쿼리할 수 있기를 원하는 상황이 있다. 예를 들어, 데이터 스트림을 재생하여 온라인 모델을 평가하거나 디버그하고 싶을 수 있다. 메시지 큐를 사용하면 스트림을 되감거나 이전 값을 쿼리할 수 없다.

지금까지 본 예제에서는 모든 메시지가 출판자에서 구독자로 성공적으로 전달되었다. 그러나 비동기 통신에서는 메시지가 누락될 가능성이 항상 있다. 구독자가 메시지를 수신하려고 시도하는 동안 브로커와의 연결이 끊어져 출판자가 보낸 메시지를 받지 못할 수 있다. RabbitMQ는 큐에 대한 게시를 트랜잭션으로 만들어 구독자에게 전달되지 않은 메시지를 다시 보내는 책임을 출판자에게 부여하여 이를 해결한다.

이는 최소 한 번 전달의 예이다. 최소 한 번 전달은 구독자가 결국 이벤트를 수신하게 하지만 중복이 발생할 수 있다. 반면, 최대 한 번 전달은 중복이 수신되지 않도록 보장하지만 이 경우 메시지가 손실될 가능성이 있다. 이 두 가지 의미의 차이는 아래 그림에 설명되어 있다.

Real-Time Machine Learning_MEAP_v01_p41

Page 1


--- Page 1 ---
그림 2.6 최소 한 번 전달은 모든 메시지가 적어도 한 번 전달되도록 보장하지만 중복이 발생할 수 있다. 최대 한 번 전달은 중복을 방지하지만 메시지 전달에 실패할 수 있다.

비동기 메시징의 이상적인 의미론은 정확히 한 번 전달이지만, 이는 매우 특정한 시나리오를 제외하고는 달성할 수 없다. 이는 네트워크 통신의 제약을 설명하는 유명한 두 장군의 사고 실험과 관련이 있다. 이 사고 실험은 E. A. Akkoyunlu, K. Ekanadham, R. V. Huber가 1975년에 발표한 논문에서 처음 소개되었다. 이 시나리오는 신뢰할 수 없는 링크를 통해 통신하는 두 당사자가 행동을 조정하는 것이 불가능하다는 것을 보여준다. 구독자가 메시지가 제대로 수신되었음을 게시자에게 확인 메시지를 보내더라도, 그 확인 메시지 자체가 손실되지 않는다는 보장은 없다.

현실적으로, 엔지니어들은 중복 데이터 처리와 데이터 누락 중 하나를 선택해야 한다. 실시간 머신러닝에서는 정확한 사용 사례에 따라 달라진다. 금융 사기 탐지 시스템에서는 중요한 거래를 놓치는 것이 모델의 예측 정확성에 영향을 미친다면 최소 한 번 전달이 선호된다. 사용자 추천과 같은 덜 중요한 시나리오에서는 사용자에게 추천이 전달되는 속도가 가끔 데이터 포인트를 잃는 것보다 더 중요하다면 최대 한 번 전달이 허용된다. 비행 예측 시스템에서는 이벤트 누락이 바람직하지 않다. 이는 온라인 모델링을 위한 데이터가 적어지고 정확한 예측을 하고자 하기 때문이다. 다행히도 항공기 ID와 타임스탬프가 고유 ID를 형성하므로 중복을 감지하는 것은 어렵지 않다. 다음 함수는 FlightSubscriber에 추가하여 현재 비행 위치를 기록하고 이벤트가 이미 처리되었는지 확인할 수 있다.

Real-Time Machine Learning_MEAP_v01_p42

Page 1


--- Page 1 ---
메시지 큐는 임시 버퍼로 사용되기 때문에 크기가 제한되어 있다. 구독자가 발행자보다 느리게 작동하면 큐의 메시지 수가 계속 증가한다. 이러한 현상을 역압(backpressure)이라고 하며, 이는 데이터 처리량이 구독자의 처리 속도에 의해 제한되는 것을 의미한다. 이는 러시아워 동안 고속도로에 너무 많은 차가 있어 교통 속도가 느려지는 것과 유사하다.

역압을 관리하는 방법에는 몇 가지가 있다. 한 가지 방법은 발행자의 속도를 의도적으로 늦추는 것이다. 실시간 기계 학습에서는 이 옵션이 바람직하지 않을 수 있는데, 이는 더 이상 관련성이 없는 오래된 데이터를 수집하게 되기 때문이다. 또 다른 방법은 큐에 대한 압력을 줄이기 위해 메시지를 삭제하는 것이며, 이는 데이터 손실로 이어진다. 높은 처리량의 데이터 수집을 처리할 때 가장 좋은 방법은 구독자의 수를 늘리는 것이다. 물론, 이는 시스템의 복잡성을 증가시킨다.

같은 큐에서 여러 구독자가 읽도록 하는 기능적 이유가 있다. 예를 들어, 온라인 모델이 생성한 추론은 여러 사용자 인터페이스 애플리케이션에서 읽을 수 있다. 그러나 메시지는 소비되면 큐에서 제거되므로 두 개의 다른 구독자가 큐에서 동일한 이벤트를 받을 수 없다. 이러한 동작을 구현하기 위해서는 실시간 버퍼 메커니즘이 필요하며, 이는 역사적 로그로도 지속될 수 있어야 한다: 이벤트 스트림(event stream)이다.

중복 메시지 감지
#A 이 항공기에 대한 최신 이벤트가 아니라면 중복이다.
#B 이 항공기에 대한 최신 이벤트를 캐시에 저장한다.
def check_duplicate(self, event):
if (
event["icao24"] in self.flights
and event["time_position"] <= self.flights[event["icao24"]]["time_position"]
):
return True
self.flights[event["icao24"]] = event
return False

Real-Time Machine Learning_MEAP_v01_p43

Page 1


--- Page 1 ---
2.2.3 이벤트 스트림

이벤트 스트림은 실시간으로 소비 가능하고, 과거 데이터를 조회할 수 있도록 설계되었다. 실질적으로 이벤트 스트림은 순서가 있는, 추가만 가능한 이벤트 로그로 존재한다. 이는 여러 발행자와 구독자 간의 비동기 통신을 위한 유용한 데이터 구조이다. 이러한 방식으로 이벤트 스트림은 이벤트 데이터를 디스크에 저장함으로써 큐의 많은 제한 사항을 해결한다. 이벤트 스트림은 여러 발행자와 구독자 간의 이벤트 통신을 관리하는 이벤트 브로커(event brokers)라는 특수한 프로세스를 필요로 한다.

RabbitMQ 브로커는 이미 지속적인 데이터 스트림을 지원한다. 큐를 생성할 때 x-queue-type=stream 옵션을 추가하고, durable=True로 설정하여 브로커의 재부팅 시에도 스트림이 지속되도록 이벤트 스트림을 정의할 수 있다. 여기에는 새로운 이벤트 스트림에 비행 업데이트를 기록하는 업데이트된 발행자가 있다. flight_updates가 이미 durable=False로 정의되어 있기 때문에 브로커는 동일한 이름을 사용할 수 없도록 한다. 대신, flight_events라는 새로운 스트림을 생성할 것이다.

Listing 2.13 이벤트 스트림을 사용하는 비행 발행자

```python
import json
import pika
import time
from urllib import request

class FlightPublisherV3:
def __init__(self, url, interval_sec, stream_name):
self.url = url
self.interval_sec = interval_sec
self.stream_name = stream_name

def response_to_events(self, api_response):
flight_events = []
for update in api_response["states"]:
flight_events.append(
{
"icao24": update[0],
"origin_country": update[2],
"time_position": update[3],
"longitude": update[5],
"latitude": update[6],
"velocity": update[9],
"true_track": update[10],
}
)
return sorted(flight_events, key=lambda x: x["time_position"])

def get_events(self):
while True:
with request.urlopen(self.url) as response:
yield from self.response_to_events(response.read())
```

Manning Publications Co.의 저작권 ©. 의견을 남기려면 liveBook으로 이동하십시오.

Real-Time Machine Learning_MEAP_v01_p44

Page 1


--- Page 1 ---
`publisher.py`를 다시 실행하면 이전과 유사한 출력이 생성된다. 그러나 브로커 터미널에서 다른 출력을 볼 수 있으며, 이는 이벤트가 실제로 디스크에 저장되고 있음을 의미한다.

2024-10-19 15:41:28.879707+00:00 [info] <0.4238.0> rabbit_stream_coordinator: rabbit@6bcd5c37b84b에서 __flight_events_1729352488363061228의 writer가 1에서 시작되었다고 한다.

2024-10-19 15:41:28.879955+00:00 [info] <0.4239.0> Stream: __flight_events_1729352488363061228이 osiris 로그 데이터 디렉토리로 /var/lib/rabbitmq/mnesia/rabbit@6bcd5c37b84b/stream/__flight_events_1729352488363061228을 사용할 것이라고 한다.

2024-10-19 15:41:28.910821+00:00 [info] <0.4239.0> osiris_writer:init/1: 이름: __flight_events_1729352488363061228 마지막 오프셋: -1 커밋된 청크 ID: -1 에포크: 1이라고 한다.

퍼블리셔를 지금 중지한다. 이벤트가 게시되었는지 확인하기 위해 새로운 구독자가 이를 검색할 수 있는지 테스트할 것이다. 또한 구독자 동작을 구성할 몇 가지 옵션이 있다.

첫 번째는 구독자 확인이다. 이벤트 기반 시스템에서 브로커는 이벤트가 구독자에게 성공적으로 전달되었는지 알아야 하며, 이를 통해 이벤트를 발신 대기열에서 제거할 수 있다. 구독자는 이벤트가 성공적으로 처리되면 즉시 확인 신호를 보내야 한다. 큐를 사용할 때 이전에 auto_ack=True로 설정했다. 이는 브로커가 이벤트를 구독자에게 전달하자마자 삭제하는 가장 높은 처리량 동작을 초래했다(“발사 후 잊기” 방법).

#A API 응답을 항공편 이벤트로 변환한다.
#B API를 폴링하고 항공편 이벤트를 생성하는 제너레이터이다.
#C 브로커의 재부팅 시에도 지속되는 이벤트 스트림을 생성한다.
#D JSON 이벤트를 이벤트 스트림에 게시한다.
#E 퍼블리셔를 실행하여 이벤트 게시를 시작한다.

```python
json.loads(response.read().decode("utf-8")) #B
) #B
def run(self):
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
channel.queue_declare(
queue=self.stream_name,
durable=True, #C
arguments={"x-queue-type": "stream"}
)
for event in self.get_events():
print("Sending flight update:", event)
channel.basic_publish( #D
exchange="", routing_key=self.stream_name, body=json.dumps(event) #D
) #D
connection.close()
publisher = FlightPublisherV3(url="https://opensky-​network.org/​api/states/​all?lamin=45.​
8389&lomin=5.​9962&lamax=47.​8229&lomax=10.​5226", interval_sec=60,
stream_name="flight_events")
publisher.run() #E
```

39 © Manning Publications Co. To comment go to liveBook

Real-Time Machine Learning_MEAP_v01_p45

Page 1


--- Page 1 ---
그러나 구독자가 이벤트를 처리할 수 없는 여러 시나리오가 있다. 예를 들어, JSON 이벤트가 정의된 스키마에 대해 유효성 검사를 실패할 수 있다(예: 필수 키가 누락된 경우). 또한, 구독자가 처리할 수 없는 사전 조건 실패로 인해 이벤트를 수신할 수도 있다(예: 구독자가 로컬 데이터베이스를 업데이트해야 하지만 데이터베이스가 아직 준비되지 않은 경우). 이러한 경우, 구독자는 브로커에게 이벤트를 삭제하거나 다시 큐에 넣어야 한다고 알리는 부정적 승인(negative acknowledgement)을 보내야 한다. 따라서 auto_ack=False로 설정하고 구독자가 수동 승인을 보내도록 변경하는 것이 더 안전하다.

다른 옵션은 처리량과 관련이 있다. RabbitMQ 소비자는 스트림의 역압(backpressure)을 완화하기 위해 구독자에서 이벤트를 처리하기 전에 미리 가져온다(prefetch). 무제한 미리 가져오기는 가장 높은 처리량을 허용하지만 구독자를 압도할 위험이 있다. 구독자가 이벤트를 승인하지 못하면 미리 가져온 이벤트가 구독자에 쌓이게 된다.

그림 2.8 구독자에서 이벤트를 미리 가져와 데이터 처리량 증가

미리 가져오기 수(prefetch count)는 일반적으로 처리량과 데이터 안전 요구 사항에 따라 실험적으로 결정된다. 우리의 경우, 60초마다 100개 미만의 이벤트만 수집하므로 높은 처리량이 필요하지 않다. 가장 보수적인 값인 1로 이 값을 설정할 것이다. 여기 새로운 이벤트 스트림에서 소비하고 데이터 안전성을 보장하기 위해 중복 검사를 통합한 업데이트된 FlightSubscriber가 있다.

Listing 2.14 이벤트 스트림을 사용하는 Flights 구독자
```python
import json
import pika
class FlightSubscriberV2:
def __init__(self, stream_name):
self.stream_name = stream_name
self.flights = {}
def check_duplicate(self, event):
if (
event["icao24"] in self.flights
and event["time_position"] <= self.flights[event["icao24"]]["time_position"]
):
return True
```

Real-Time Machine Learning_MEAP_v01_p46

Page 1


--- Page 1 ---
새로운 구독자를 실행해 보아라. 놀랍게도, 발행된 이벤트를 전혀 받지 못한다. 이제 발행자를 다시 실행해 보아라.

비행 업데이트를 받았다: {'icao24': '4ba975', 'origin_country': 'Turkey', 'time_position': 1729359358, 'longitude': 6.1143, 'latitude': 46.2395, 'velocity': 8.23, 'true_track': 45}

이번에는 구독자가 데이터를 받았지만, 단 하나의 이벤트만 받았다. prefetch_count를 2로 설정하고 구독자를 다시 실행해 보아라. 그런 다음, 발행자를 재시작하여 새로운 이벤트를 생성해 보아라.

비행 업데이트를 받았다: {'icao24': '4a3121', 'origin_country': 'Romania', 'time_position': 1729359386, 'longitude': 9.803, 'latitude': 47.734, 'velocity': 171.92, 'true_track': 268.11}

비행 업데이트를 받았다: {'icao24': '398579', 'origin_country': 'France', 'time_position': 1729359559, 'longitude': 9.7242, 'latitude': 46.6355, 'velocity': 217.26, 'true_track': 125.46}

이제 구독자는 두 개의 새로운 이벤트만 받았다. 무슨 일이 일어나고 있는지 이해하기 위해 이벤트의 순서를 고려해 보아라.

1. 발행자가 시작하여 브로커에 몇 가지 이벤트를 발행한다.
#A 중복 이벤트를 확인하기 위한 함수 콜백.
#B 이벤트를 소비할 때 최대 1개의 이벤트를 미리 가져온다.
#C 중복을 확인하기 위해 콜백을 구성한다.
#D 자동 확인을 비활성화한다.
#E 구독자를 실행하여 스트림에서 이벤트 소비를 시작한다.
self.flights[event["icao24"]] = event
return False
def process_message(self, channel, method, properties, body): #A
event = json.loads(body) #A
if not self.check_duplicate(event): #A
print(f"Received flight update: {event}") #A
def run(self):
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
channel.queue_declare(
queue=self.stream_name,
durable=True,
arguments={"x-queue-type": "stream"}
)
channel.basic_qos(prefetch_count=1) #B
channel.basic_consume(
queue=self.stream_name,
on_message_callback=self.process_message, #C
auto_ack=False #D
)
channel.start_consuming()
subscriber = FlightSubscriberV2(stream_name="flight_events")
subscriber.run() #E

Real-Time Machine Learning_MEAP_v01_p47

Page 1


--- Page 1 ---
2. 구독자는 prefetch_count=1로 시작하고 이벤트를 받지 않는다.
3. 발행자가 재시작하고 브로커에 더 많은 이벤트를 발행한다. 구독자는 하나의 이벤트를 받는다.
4. 구독자는 prefetch_count=2로 재시작하고 이벤트를 받지 않는다.
5. 발행자가 재시작하고 브로커에 더 많은 이벤트를 발행한다. 구독자는 2개의 이벤트를 받는다.
여기서 구독자는 발행자가 재시작할 때만 새로운 이벤트를 소비한다. 이는 큐와 달리, 브로커에 이전에 발행된 메시지가 새로운 구독자에게 자동으로 전달되지 않기 때문이다. 대신, 새로운 구독자는 마지막으로 발행된 오프셋에서 스트림을 읽기 시작한다. 구독자가 이전 이벤트를 읽도록 하려면 x-stream-offset 옵션을 다른 값으로 설정해야 한다. 예를 들어, 구독자를 스트림의 시작부터 시작하도록 설정할 수 있다.

두 번째 질문은 왜 구독자가 3단계에서 하나의 이벤트만 받고 5단계에서 두 개의 이벤트를 받는가이다. 그 이유는 구독자가 이벤트를 인식하도록 설정하지 않았기 때문에 prefetch 큐가 가득 차기 때문이다. 스트림의 나머지를 소비하려면 구독자가 각 이벤트가 제대로 처리된 후 브로커에 인식해야 한다. 아래의 목록에 표시된 대로이다.

이 업데이트를 한 후, 구독자를 재시작하라. 이제 구독자가 모든 이벤트를 소비하는 것을 볼 수 있다. 모든 이벤트가 수신되었는지 확인하려면 마지막 몇 개의 이벤트를 확인하라. 이는 마지막 발행자 실행과 동일해야 한다. 또한 구독자 출력에서 이전과 동일한 이벤트를 소비하는지 확인할 수 있다.

Listing 2.15 스트림의 시작부터 소비하기
#A 이벤트를 소비할 때 첫 번째 오프셋에서 시작한다.
```python
channel.basic_consume(
queue=self.stream_name,
on_message_callback=self.process_message,
arguments={"x-stream-offset": "first"} #A
)
channel.start_consuming()
```

Listing 2.16 구독자로부터의 인식 전송
#A 이벤트가 성공적으로 처리되었음을 나타내는 이벤트 인식.
```python
def process_message(self, channel, method, properties, body):
event = json.loads(body)
if not self.check_duplicate(event):
print(f"Received flight update: {event}")
channel.basic_ack(delivery_tag=method.delivery_tag) #A
```

42
© Manning Publications Co. To comment go to liveBook

Real-Time Machine Learning_MEAP_v01_p48

Page 1


--- Page 1 ---
비행 업데이트 수신: {'icao24': '4a3121', 'origin_country': 'Romania', 'time_position': 1729359386, 'longitude': 9.803, 'latitude': 47.734, 'velocity': 171.92, 'true_track': 268.11}

비행 업데이트 수신: {'icao24': '4b18fc', 'origin_country': 'Switzerland', 'time_position': 1729359430, 'longitude': 8.5584, 'latitude': 47.4614, 'velocity': 0, 'true_track': 275.62}

비행 업데이트 수신: {'icao24': '70c0cd', 'origin_country': 'Oman', 'time_position': 1729359447, 'longitude': 8.5553, 'latitude': 47.4608, 'velocity': 0, 'true_track': 5.62}

비행 업데이트 수신: {'icao24': '398579', 'origin_country': 'France', 'time_position': 1729359559, 'longitude': 9.7242, 'latitude': 46.6355, 'velocity': 217.26, 'true_track': 125.46}

이벤트 스트림의 강력한 점 중 하나는 여러 발행자와 구독자로 확장할 수 있다는 것이다. 새로운 터미널 창을 열고 구독자의 새 복사본을 시작한 후 발행자를 다시 시작하여 이를 확인할 수 있다. 60초 후 발행자는 새로운 이벤트 시리즈를 생성하고 각 구독자는 동일한 출력을 출력하여 둘 다 새로운 이벤트를 소비했음을 나타낸다. 기본 메시지 큐에서는 이벤트가 소비되면 큐에서 제거되기 때문에 이는 불가능했다.

이벤트 기반 시스템은 다양한 이벤트 스트림을 통해 통신하는 많은 발행자와 구독자로 구성된다. 이벤트 스트림 자체는 때때로 주제(토픽)라고도 한다. 지금까지 실시간 비행 데이터를 캡처하는 단일 주제를 다루어 왔다. 그러나 실시간 머신러닝 시스템에서는 특정 데이터 클래스에 전념하는 여러 주제가 있을 것이다. 다음 섹션에서는 실시간 머신러닝 시스템을 구축하기 위해 주제 측면에서 데이터 워크플로를 구성하는 방법을 탐구할 것이다.

2.3 머신러닝을 위한 이벤트 기반 아키텍처의 이점 이제 데이터 수집 파이프라인을 구축하기 위해 이벤트 기반 아키텍처를 사용하는 방법을 설명했으므로, 머신러닝 애플리케이션을 구축하는 데 특히 적합한 이유에 대해 간단히 설명하겠다.

일반적으로 머신러닝 제품을 구축하는 접근 방식은 데이터 배치를 통해 모델을 훈련한 다음, 모델을 프로덕션 소프트웨어 애플리케이션에 통합하여 추론을 수행하는 것이다. 추론을 생성하는 방법에는 여러 가지가 있다. 한 가지 방법은 훈련 과정과 유사하게 배치 방식으로 오프라인에서 추론을 사전 계산하는 것이다. 즉, 정적이고 유한한 데이터 세트에서 추론을 생성하는 것이다. 이러한 상황에서는 시기적절함이 보통 중요한 요소가 아니다. 예로는 주택 가격 예측, 고객 생애 가치, 소매점의 상품 수요 예측 등이 있다.

그러나 시간이 지남에 따라 더 많은 활동이 온라인에서 이루어지고 있다. 불과 몇십 년 전만 해도 CD로 음악을 듣고 영화와 TV 프로그램의 DVD를 대여하거나 구매했다. 네트워크와 케이블 TV가 프로그램의 일정을 통제했고, 많은 금융 거래가 대면으로 이루어졌다. 오늘날 우리는 온라인으로 음악을 듣고 콘텐츠를 시청한다. Netflix와 같은 서비스에서 프로그램을 시청할 시간을 우리가 통제한다. 우리는 온라인으로 더 많은 금융 거래를 수행하고 Uber와 같은 차량 공유 앱을 호출하며 Doordash와 같은 온라인 배달 서비스를 통해 음식을 주문한다.

Real-Time Machine Learning_MEAP_v01_p49

Page 1


--- Page 1 ---
위 내용은 서비스 제공의 적시성에 대한 필요성이 증가하고 있음을 강조한다. 이는 실시간 애플리케이션에서 중요한 구성 요소인 기계 학습(machine learning)에도 영향을 미친다. 실시간으로 추론을 생성하는 것이 필수적인 많은 사용 사례가 있다. 이전 장에서 몇 가지 예를 다루었다. 혜택을 논의하기 전에, 현재 기계 학습 애플리케이션이 어떻게 배포되는지에 대해 이야기하겠다.

2.3.1 동기식 기계 학습
기계 학습 애플리케이션은 특수한 유형의 소프트웨어 애플리케이션이므로, 전통적인 소프트웨어 애플리케이션과 동일한 프레임워크에 기계 학습을 포함시키는 것이 합리적이다. 이는 일반적으로 기계 학습 모델과 새로운 데이터에 대한 추론을 생성하는 데 사용되는 코드인 추론 엔진이 마이크로서비스의 구성 요소로 제공된다는 것을 의미한다. 종종 이 마이크로서비스는 REST 애플리케이션의 일부이다. 이는 하나의 마이크로서비스가 기계 학습 마이크로서비스에 특징을 제공하여 요청을 하고, 기계 학습 마이크로서비스가 모델을 로드하고 추론을 생성하여 응답 형태로 추론을 반환하는 것을 포함한다. 그림 2.9는 이러한 상호작용의 예를 보여준다. 이 예에서는 고객이 레스토랑에 대한 리뷰를 게시할 수 있는 웹 애플리케이션이 있다. 리뷰는 백엔드 서비스로 전송되어 고객 리뷰 감정 예측기로 라우팅되며, 이는 리뷰의 감정(긍정적 또는 부정적)을 생성할 모델을 로드하고 이를 백엔드 서비스에 응답으로 반환하는 마이크로서비스이다. 응답을 받은 후, 리뷰와 감정은 고객 서비스 담당자가 고객 리뷰를 분류하는 데 사용하는 다른 애플리케이션인 고객 지원 포털로 전달된다.

Real-Time Machine Learning_MEAP_v01_p50

Page 1


--- Page 1 ---
그림 2.9 고객 리뷰 감성 분석 애플리케이션의 동기식 아키텍처 사용

위의 상호작용의 본질은 동기식이다. 요청을 제출하는 서비스는 응답을 받을 때까지 기다린 후 애플리케이션 워크플로의 다음 단계로 진행한다. 서비스가 응답을 받을 때까지 기다려야 한다는 사실은 애플리케이션에 약간의 지연을 초래하며, 이는 일부 사용 사례에서는 용납되지 않을 수 있다.

지연은 다른 방식으로도 시스템에 들어올 수 있다. 예측 요청이 예측 엔진에 과도하게 몰리면, 처리할 수 있는 양을 초과하여 역압(backpressure)을 발생시켜 응답이 느려진다. 일부 애플리케이션 설계자는 서버 부하를 관리하기 위해 요청 유입을 관리하는 속도 제한(rate limiting)을 적용할 수 있지만, 이는 서비스가 요청을 단순히 거부하게 된다는 것을 의미한다. 어느 상황도 이상적이지 않으며, 사용자 경험이 저하될 수 있다.

Real-Time Machine Learning_MEAP_v01_p51

Page 1


--- Page 1 ---
그림 2.10 고객 리뷰 감성 분석 애플리케이션의 동기식 아키텍처 사용

위의 내용은 특징을 전송하는 서비스와 추론을 생성하고 반환하는 서비스 간에 긴밀한 결합이 있음을 강조한다. 머신러닝 마이크로서비스의 부하는 얼마나 많은 요청이 전송되는지에 따라 영향을 받을 수 있으며, 요청하는 마이크로서비스의 성능은 머신러닝 마이크로서비스의 응답성에 의존한다.

이전에 제시된 고객 리뷰 애플리케이션은 머신러닝 마이크로서비스를 호출하는 애플리케이션이 추론을 받아 다른 애플리케이션에 전달하여 고객 서비스 담당자가 리뷰를 분류하는 시나리오를 보여준다. 만약 여러 애플리케이션에서 추론이 필요하다면 어떻게 될까? 예를 들어, 추론은 비즈니스 인텔리전스 목적으로 사용되는 분석을 표시하는 대시보드의 일부일 수 있다. 추론과 특징은 오프라인 분석을 위해 데이터베이스에 저장될 수도 있다. 워크플로우의 어느 부분에서 병목 현상이 발생하면, 그 영향은 전체 시스템이 실패하는 연쇄 반응을 일으킬 수 있다.

이 의존성에는 다른 문제도 있다. 머신러닝 마이크로서비스가 API 변경을 해야 한다면, 이에 의존하는 모든 다른 마이크로서비스도 코드를 변경해야 한다. 많은 조직에서 머신러닝 마이크로서비스를 개발한 팀은 이를 호출하는 팀과 다르다. 이들 팀은 일반적으로 다른 목표, 우선순위, 출시 일정을 가지고 있다. 그 결과는 많은 조직에서 발생하는 의사소통 문제로, 머신러닝 프로젝트가 실패할 수 있다.

Real-Time Machine Learning_MEAP_v01_p52

Page 1


--- Page 1 ---
이 모든 것은 중요한 점을 강조한다: 반응형 애플리케이션을 구축하기 위해서는 데이터에 대한 빠른 접근이 핵심이다. 데이터는 실시간 시스템에서 돈과 유사한 가치를 가진다. 오늘의 1달러가 미래의 1달러보다 더 가치가 있는 것처럼, 실시간 머신 러닝 애플리케이션에서 데이터가 더 빨리 이용 가능하고 처리되어 실행될수록 조직에 더 많은 가치를 제공한다. 예를 들어, 사용자 활동에 기반하여 실시간으로 추천을 조정할 수 있는 애플리케이션은 전날의 데이터로 학습되어 조정할 수 없는 애플리케이션보다 사용자를 더 잘 유지하고 다시 방문하게 할 가능성이 높다. 마찬가지로, 애플리케이션이 사기성 금융 거래를 더 빨리 감지하고 표시할 수 있을수록 금융 기관이 시간이 지남에 따라 더 많은 돈을 절약할 가능성이 높다.

2.3.2 이벤트 기반 머신 러닝
이 장의 앞부분에서 데이터 신선도 문제에 대한 해결책으로 실시간 데이터 흐름과 이벤트 기반 아키텍처의 개념을 소개했다. 그림 2.11은 이벤트 기반 아키텍처를 사용하는 애플리케이션의 예를 보여준다. 고객 리뷰 애플리케이션이 고객 리뷰 데이터를 고객 리뷰 주제로 전달한다. 머신 러닝 모델을 포함하는 고객 리뷰 감정 예측 애플리케이션은 새로운 고객 리뷰가 고객 리뷰 주제에 추가되면 즉시 추론을 생성하고 이를 고객 감정 주제에 게시한다. 고객 리뷰 감정 예측 애플리케이션은 하나의 주제를 구독하고(데이터를 수신) 다른 주제에 게시하는 "핸들러" 애플리케이션으로 간주할 수 있다.

그림 2.11 이벤트 기반 아키텍처를 사용하는 고객 리뷰 감정 분석 애플리케이션

Real-Time Machine Learning_MEAP_v01_p53

Page 1


--- Page 1 ---
이전 예시와의 주요 차이점은 고객 리뷰 데이터를 수집하는 고객 리뷰 애플리케이션이 고객 리뷰 감정 분석 애플리케이션과 독립적으로 작동한다는 것이다. 이 애플리케이션은 단순히 데이터를 발행하는 역할을 한다. 감정 분석 애플리케이션은 고객 리뷰 주제에 구독하고 주제에 저장된 데이터를 처리하여 고객 감정 주제에 추론 결과를 생성하고 발행하는 책임이 있다. 고객 리뷰 애플리케이션은 더 이상 추론 결과를 수집하고 모든 후속 작업을 수행할 필요가 없기 때문에 감정 분석 애플리케이션의 내부를 알 필요가 없다. 사실, 이 다이어그램은 다른 애플리케이션이 후속 작업을 처리하도록 설정할 수 있는 방법도 보여준다. 이 예시에서는 감정 분석 애플리케이션이 데이터를 발행하는 고객 감정 주제에 구독된 두 개의 애플리케이션이 있다. 하나는 고객 지원 포털을 운영하여 고객 지원 인력이 리뷰를 분류할 수 있도록 한다. 두 번째 애플리케이션은 고객 리뷰와 모델 출력을 분석하여 비즈니스 인텔리전스 목적으로 사용할 수 있는 분석 포털을 생성한다. 이 시나리오에서 다른 두 애플리케이션은 감정 분석 애플리케이션과 직접 대화하지 않는다.

이 아키텍처의 또 다른 이점은 주제의 구독자가 원하는 간격으로 데이터를 소비할 수 있다는 것이다. 따라서 구독자는 SLA가 즉각적인 처리를 요구하는 경우 실시간 이벤트를 선택할 수 있으며, 실시간으로 이벤트를 수집할 필요가 없는 경우 나중에 이벤트를 수집할 수 있다.

이벤트 기반 아키텍처는 애플리케이션 유지보수를 개선한다. 이러한 애플리케이션이 분리되어 있기 때문에 각 팀은 자체 유지보수 주기를 선택할 수 있으며, 코드 변경이 다른 팀에 영향을 미칠 걱정을 할 필요가 없다.

이벤트 기반 애플리케이션은 데이터 접근의 민주화를 통해 데이터 사일로 문제를 해결한다. 동기적 패러다임에서는 소비 애플리케이션이 데이터를 생성하는 애플리케이션의 API 사양을 따라야 한다. 그러나 애플리케이션이 데이터를 수신하기 위해 주제에 구독만 하면 되고 데이터 생성자를 호출하는 방법에 대해 걱정할 필요가 없다면, 애플리케이션 설계에 훨씬 더 많은 유연성이 생기고 소비 애플리케이션은 데이터 생성자와 다른 프로그래밍 언어로 작성될 수 있다. 이는 조직 내 여러 팀이 각자의 배경에 따라 데이터를 활용할 수 있는 가능성을 열어주며, 이는 조직에 더 많은 가치 흐름을 창출한다.

이벤트 기반 아키텍처는 데이터 접근을 방해하고 머신러닝 애플리케이션 개발을 지연시키는 또 다른 문제를 해결한다: 경험이 부족한 팀원이 데이터를 실수로 삭제할 수 있다는 두려움이다. 전통적인 관계형 데이터베이스와 달리, 데이터는 업데이트되거나 삭제될 수 없으며, 이벤트는 불변이며 단순히 주제에 추가된다. 이는 데이터에 대한 읽기 전용 접근을 제공하는 것과 유사하다. 소비 애플리케이션은 사용 사례에 따라 이벤트를 처리해야 한다. 또한, 특정 데이터 요소가 민감하고 제한이 필요한 경우, 민감한 요소를 마스킹하거나 제거하고 이벤트를 다른 주제에 발행하여 조직 내 누구나 사용할 수 있도록 하는 애플리케이션을 생성할 수 있다. 반면, 관계형 데이터베이스에서는 지속적으로 업데이트해야 하는 복잡한 행/열 기반 데이터베이스 접근 정책이 필요하거나, 여러 사용자를 위해 동일한 데이터 소스의 여러 복사본을 생성하고 유지보수해야 한다.

Real-Time Machine Learning_MEAP_v01_p54

Page 1


--- Page 1 ---
이벤트 기반 아키텍처가 애플리케이션 유지보수를 어떻게 더 잘 지원하는지 간단히 언급하였다. 이제 이를 머신러닝의 맥락에서 확장하여 설명할 것이다. 머신러닝 애플리케이션을 유지보수하는 것은 소프트웨어 애플리케이션을 유지보수하는 것보다 훨씬 복잡하다. 예를 들어, 모델 결과의 재현성은 머신러닝 모델을 관리하는 데 중요한 측면이다. 데이터는 일반적으로 최신 스냅샷만 저장하는 관계형 데이터베이스에 저장된다. 이는 머신러닝 모델을 훈련하는 데 사용된 데이터를 캡처하고 버전 관리하는 솔루션을 설계하는 것이 간단한 작업이 아님을 의미한다. 이벤트 스트리밍 플랫폼에서는 이벤트 내의 데이터가 불변이며 이러한 이벤트는 완전히 순서가 정해져 있다. 이는 애플리케이션이 모델 훈련에 사용된 데이터의 시작과 끝 오프셋(offset)만 저장하면 된다는 것을 의미한다. 아래의 그림 2.12는 모델의 v1이 오프셋 0에서 5까지의 데이터로 훈련되었고, v2 버전의 모델이 오프셋 6에서 9까지의 데이터로 훈련된 예를 보여준다.

그림 2.12 오프셋을 사용하여 모델 훈련 추적

이 기능의 하위 이점은 서로 다른 모델 버전을 비교하는 것이 훨씬 간단해진다는 것이다. 데이터가 항상 사용 가능하기 때문에 사후 분석이 필요할 때 더 쉽게 수행할 수 있다. 예로는 모델 매개변수, 구성, 평가 지표, 데이터 분포, 특징과 목표 간의 관계, 특징 중요도 등을 비교하는 것이 포함된다. 한 걸음 더 나아가 훈련 데이터와 추론에 사용된 데이터를 비교하여 운영 중 문제를 해결할 수 있다. 실제로 훈련에 사용된 특징과 추론에 사용된 특징이 동일한 토픽에 저장되면, 동일한 코드를 사용하여 훈련 및 추론 데이터를 수집할 수 있어 머신러닝 애플리케이션을 배포하는 데 걸리는 시간이 줄어든다.

CPU 및 메모리 사용량과 같은 애플리케이션 상태 메트릭을 캡처하고 분석하는 것도 관련 토픽에 이벤트로 게시될 때 더 쉬워진다. 이는 예기치 않은 문제가 발생할 때 실시간 경고를 받을 수 있는 가능성을 열어준다.

Real-Time Machine Learning_MEAP_v01_p55

Page 1


--- Page 1 ---
기계 학습 응용 프로그램의 또 다른 구성 요소는 모델 평가와 재훈련을 위한 사용자 피드백 수집이다. 그림 2.13은 프론트 엔드 애플리케이션이 추론 주제에 구독하고 모델의 결과를 표시하는 예를 보여준다. 모든 부정적인 고객 리뷰를 포함하는 애플리케이션이라고 가정하자. 고객 서비스 담당자가 분류해야 하는 리뷰이다. 각 결과 옆에는 "정확함" 또는 "부정확함"이라는 버튼이 있어 고객 서비스 담당자가 선택적으로 클릭하여 피드백을 제공할 수 있다. 이 피드백은 "피드백" 주제에 캡처되어 모델 성능을 평가하고 재훈련을 위한 데이터로 사용될 수 있다.

그림 2.13 이벤트 기반 아키텍처를 사용하여 사용자 피드백 캡처하기

위의 예는 이벤트 기반 아키텍처가 배치 기계 학습 모델을 사용하는 반응형, 저지연 실시간 애플리케이션을 구축하는 것을 어떻게 더 쉽게 만드는지를 보여준다.

이벤트 기반 아키텍처가 실시간 기계 학습 모델을 사용하는 애플리케이션을 구축하는 데 중요한 구성 요소라는 주장도 무리가 아니다. 설계상, 실시간 기계 학습 모델은 먼저 예측을 하고, 레이블이 사용 가능할 때 학습한다. 실시간 맥락에서는 레이블이 언제 사용 가능할지 제어할 수 없다. 기계 학습 모델이 특정 사용자에게 기사 세트를 제시하기로 결정하는 예를 고려해보자. 이 사용 사례의 레이블은 사용자가 이 기사를 관련성이 있다고 판단했는지 여부이며, 관련성을 판단하는 방법은 사용자가 기사를 클릭했는지 여부이다. 사용자 클릭은 이 클릭 이벤트와 기사를 "레이블" 주제에 저장하는 프로세스를 시작할 수 있는 "이벤트"이다. 다른 쪽 끝에는 이 정보를 기계 학습 모델에 전달하는 구독자가 있을 수 있다. 또한 사용자의 페이지에 기사가 있는 시간을 모니터링하고 사용자가 일정 시간 내에 기사에 대해 아무런 조치를 취하지 않으면 "관련 없음" 이벤트를 "레이블" 주제에 게시하는 애플리케이션도 있을 수 있다. 이 모든 것은 아래 그림 2.14에 제시되어 있다.

Real-Time Machine Learning_MEAP_v01_p56

Page 1


--- Page 1 ---
그림 2.14 실시간 머신러닝 애플리케이션 워크플로우

위의 예시에서 보듯이, 전통적인 소프트웨어 아키텍처 패턴은 마이크로서비스 간의 긴밀한 결합을 포함하여 실시간 애플리케이션에 적합하지 않다. 반면에 이벤트 기반 아키텍처는 데이터 수집, 추론 생성, 메트릭 수집, 사용자 피드백과 같은 다양한 작업을 원활하게 조정하여 낮은 지연 시간과 반응성이 뛰어난 머신러닝 애플리케이션을 구축할 수 있게 한다.

이 장에서는 메시지 큐잉 솔루션을 사용하여 이벤트 스트림에서 데이터를 수집하는 방법을 배웠다. 또한 실시간 데이터를 생성하는 퍼블리셔(publisher)와 실시간 데이터를 소비하는 구독자(subscriber)를 만드는 방법도 배웠다. 3장에서는 이벤트 스트림의 데이터를 사용하여 실시간으로 추론을 생성할 뿐만 아니라 데이터가 사용 가능해지자마자 지속적으로 학습하는 나우캐스팅 모델(nowcasting model)을 구축하는 방법에 집중할 것이다.

2.4 요약

실시간 머신러닝은 과거 데이터와 실시간 데이터 모두에 대한 접근이 필요하다.
이벤트는 특정 시간에 무언가가 발생했다는 기록이다.
폴링 간격은 머신러닝을 위해 캡처되는 실시간 데이터의 양에 영향을 미친다.
메시지 큐는 서로 다른 프로세스 간의 간단한 비동기 통신을 가능하게 한다.
퍼블리셔 또는 프로듀서(producer)는 이벤트를 생성하는 프로세스이다.
구독자 또는 소비자(consumer)는 이벤트를 구독하는 프로세스이다.
동기 데이터 전송은 두 당사자 간의 직접적인 조정을 필요로 한다.
비동기 데이터 전송은 게시된 데이터를 저장할 버퍼가 필요하다.
백프레셔(backpressure)는 구독자가 퍼블리셔보다 이벤트를 더 느리게 처리하여 전체 데이터 처리량을 감소시키는 현상이다.
최소 한 번 전달(at least once delivery)은 모든 메시지가 최소 한 번 전달되도록 보장하지만 중복 메시지가 전송될 수 있다.
최대 한 번 전달(at most once delivery)은 중복을 방지하지만 메시지가 누락될 수 있다.
이벤트 스트림은 이벤트의 추가 전용 순서 로그이며 때때로 토픽(topics)이라고도 한다.
이벤트 브로커(event broker)는 데이터를 이벤트 스트림에 지속시켜 많은 퍼블리셔와 구독자 간의 이벤트 전달을 조정한다.

Real-Time Machine Learning_MEAP_v01_p57

Page 1


--- Page 1 ---
구독자는 데이터를 처리하기 전에 이벤트를 미리 가져와 데이터 처리량을 증가시킬 수 있지만, 이는 구독자를 과부하 상태로 만들 수 있다. 이벤트 스트림은 실시간으로 소비되거나 과거 데이터를 조회할 수 있다. 이벤트 기반 아키텍처는 데이터 수집이 추론 및 사용자 피드백 수집과 같은 다른 중요한 머신러닝 프로세스와 동시에 작동할 수 있게 한다.

반응형