반응형
반응형
반응형

이 장에서는 다음을 다룬다:

 

  • 실시간 이벤트 데이터 수집
  • 이벤트 기반 아키텍처를 사용하여 실시간 데이터를 스트리밍하고 저장
  • 실시간 머신러닝을 위한 이벤트 기반 아키텍처의 장점

모든 머신러닝 프로젝트는 데이터에서 패턴을 학습하는 머신러닝 모델을 사용하여 해결할 수 있는 문제로 시작한다. 사용할 수 있는 데이터와 수집할 수 있는 데이터는 프로젝트의 가능성을 정의한다. 실시간 머신러닝도 다르지 않다. 실시간 머신러닝 애플리케이션을 구축하기 시작하려면 사용할 수 있는 실시간 데이터가 무엇인지 이해하고 이를 사용하여 유용한 예측을 생성하는 방법을 알아야 한다.

이전 장에서는 실시간 데이터 인스턴스가 실시간 추론을 생성하고 온라인 모델을 훈련하는 데 어떻게 사용되는지 탐구했다. 이를 달성하기 위해서는 데이터가 먼저 원본에서 추론이 이루어지는 프로세스로 수집되고 전송되어야 한다. 이는 데이터 스트림으로 알려진 지속적인 데이터 전송을 처리하기 위한 전용 데이터 아키텍처가 필요하다. 실제로 데이터 스트림은 메시지 큐 또는 이벤트 스트림으로 구현된다. 메시지 큐와 이벤트 스트림은 2.2절에서 논의한다.

이 장에서는 실시간 데이터 소스에서 데이터 포인트를 수집하는 방법을 배운다. 그런 다음, 이 데이터를 이벤트 스트림에 저장하는 퍼블리셔를 작성하고, 스트림에서 실시간 및 과거 데이터를 읽는 구독자를 작성한다. 퍼블리셔와 구독자는 2.2절에서 논의할 것이다. 또한, 이벤트 기반 아키텍처가 실시간 데이터 스트리밍과 관련된 역압(backpressure) 및 확장성 문제를 해결하는 방법을 배운다. 이 장의 끝에서는 실시간 머신러닝을 위한 데이터 수집 메커니즘을 준비하게 될 것이다.

 

그림 2.1은 이벤트 기반 아키텍처를 사용하여 실시간 데이터 흐름을 설명하는 간단한 워크플로우 예시를 보여준다. 구체적으로, 실시간 데이터 소스에서 데이터를 수집하고 이 데이터를 주제(topic)에 게시하는 발행자(publisher)가 있다. 중간에 메시지 브로커(message broker)가 있어 발행자로부터 데이터를 주제를 구독한 구독자(subscriber)에게 라우팅한다.

Figure 2.1 Simple event-driven workflow


2.1 실시간 이벤트 수집

시스템이 진행 중인 항공편의 도착 시간을 예측하도록 하고 싶다고 가정하자. 이는 여행자들이 픽업 차량을 예약하거나 운전자가 공항에 도착할 최적의 시간을 결정하는 데 도움이 될 수 있다. 이 예측 시스템을 구축하기 위해서는 실시간 데이터 소스에서 데이터를 수집해야 한다. 시스템은 항공편 데이터를 스트리밍하고, 온라인 모델을 훈련하기 위해 진행 중인 날씨 업데이트와 같은 다른 실시간 데이터 소스와 통합할 필요가 있다. 또한, 시스템은 최종 사용자에게 실시간 업데이트를 제공하기 위해 항공편 예측을 스트리밍해야 한다.

이 섹션에서는 데이터 수집 부분에 초점을 맞출 것이다. 가장 간단한 형태로, 애플리케이션은 API로부터 데이터를 스트리밍하고 항공편 위치와 속도와 같은 중요한 이벤트를 캡처해야 한다. 데이터 이벤트를 처음부터 생성하고 이러한 이벤트를 실시간 데이터 스트림에 게시하는 방법을 배울 것이다.

다음 그림은 이 장의 결론에서 개발할 데이터 수집 애플리케이션의 아키텍처 다이어그램을 나타낸다.

Figure 2.2 Real-time flights data ingestion application



2.1.1 데이터 소스 선택

우리의 목표는 진행 중인 항공편의 도착 시간을 예측하는 나우캐스팅(nowcasting) 모델을 구축하는 것이다. 나우캐스팅은 예측(forecasting)과 유사하게 미래를 예측하려는 접근 방식이지만, 나우캐스팅은 최근의 트렌드에 크게 의존하여 가까운 미래의 값을 예측한다. 예를 들어, 허리케인이 어디에 상륙할지 또는 주식이 몇 시간 내에 어떤 방향으로 움직일지를 예측하는 것이다. 우리의 나우캐스팅 모델은 최근 항공편 위치를 고려하여 항공편이 언제 도착할지를 예측하려고 한다.


프로젝트를 위한 실시간 데이터 소스를 선택할 때 다음을 고려해야 한다.

이용 약관: 대부분의 API는 데이터 사용 방법에 대한 약관을 명시한다. 예를 들어, 일부 사용 계약은 데이터를 공개적으로 복사하거나 데이터를 사용하는 애플리케이션을 판매하는 것을 금지한다.

신뢰성: 실시간 데이터 프로젝트의 어려운 점 중 하나는 외부 서비스가 제대로 작동하기 위해 사용 가능해야 한다는 것이다. 데이터 소스를 선택할 때, API의 신뢰성을 평가하기 위해 초기 실사를 수행하는 것이 중요하다. 특히, 잘 유지되고 다른 사람들이 사용하는 데이터 소스를 선택하고자 한다. 이를 위한 특정 지표가 있다. 예를 들어, 신뢰할 수 있는 API는 예제가 포함된 자세한 문서를 가지고 있는 경우가 많다. API를 제공하는 조직이나 당사자가 공식 프로그래밍 클라이언트를 가지고 있고, GitHub 저장소가 잘 유지되고 있는지(예: 지난달에 업데이트됨, 유지 관리자가 문제에 응답함, 수백 개의 별이 있음) 확인하는 것이 좋은 신호다.

요율 제한: 대부분의 API는 스팸 공격을 방지하기 위해 일정한 요율 제한을 부과한다. 데이터 소스를 선택할 때 필요한 데이터의 양과 빈도를 결정하는 것이 필요하다. 또한, 실시간 데이터 소스의 제한을 처리하기 위한 전략을 사용할 것이다. 예를 들어, 이 장에서는 요청 요율 제한에 걸리지 않기 위해 폴링 간격 전략을 활용할 것이다.

데이터 품질: 외부 데이터 소스를 사용할 때는 제공되는 데이터에 제한된다. 예를 들어, "실시간"으로 광고된 데이터가 30초 간격으로만 제공될 수 있다. 사용 사례에 따라 얼마나 자주 새로운 데이터가 필요한지 이해하는 것이 중요하다. 가까운 미래의 주가 변동을 예측하려는 애플리케이션은 초 단위로 업데이트를 받아야 할 수 있지만, 월간 강수량을 예측하는 애플리케이션은 일일 업데이트만 필요할 수 있다. 다음 장에서는 온라인 모델링과 관련하여 실시간 데이터 기능의 타당성을 평가하는 방법을 탐구할 것이다.

데이터 형식: REST API는 비구조화된 JSON 데이터를 반환하는 경우가 많다. 데이터 수집 파이프라인의 목표 중 하나는 이 데이터를 정의된 구조로 신뢰성 있게 역직렬화할 수 있도록 하는 것이다. 외부 API를 사용할 때 잠재적인 문제는 데이터 스키마가 언제든지 변경될 수 있어 다운스트림 애플리케이션에 문제가 발생할 수 있다는 것이다. 이러한 이유로 가능한 한 공식 프로그래밍 언어 API를 사용하고, 필드 검증을 수행하여 호환성 문제를 쉽게 식별하고 수정하는 것이 중요하다.

우리의 나우캐스팅 모델을 구동하기 위해 진행 중인 항공편에 대한 데이터를 수집하고자 한다. 오프라인 학습을 위해서는 역사적인 항공편 데이터셋을 찾는 것으로 시작할 것이다. 반면, 우리의 모델은 예측을 위해 매우 최근의 데이터를 사용해야 한다. 따라서 항공편 데이터를 수집하는 실시간 데이터 소스가 필요하다. 다행히 OpenSky (https://opensky-network.org/) 네트워크는 상업 항공편을 포함한 진행 중인 항공 교통에 대한 실시간 비행 데이터를 제공한다. 작성 시점에서 이는 비영리 연구 목적으로 사용할 수 있다.

많은 API가 "실시간" API로 설명되지만 매우 다르게 설계될 수 있다는 점에 유의해야 한다. 일부는 업데이트를 푸시 기반 형식으로 제공하기 위해 장기 실행, 저지연 웹소켓을 활용하여 진정한 실시간이다. 이는 금융 분야에서 가장 일반적이며, 거래량이 많고 정보를 가능한 빨리 아는 것이 매우 가치가 있기 때문이다. 다른 API는 풀 기반이므로 데이터 업데이트를 위해 클라이언트가 주기적으로 요청해야 한다.


OpenSky는 REST 기반의 API를 제공하여 웹 요청을 할 수 있다. 특히, 진행 중인 항공편에 대한 "state vectors"를 반환하는 GET /states/all 경로에 관심이 있다. 내장된 Python의 urllib 라이브러리를 사용하여 API에 단일 요청을 하고 결과를 읽기 쉬운 텍스트로 출력할 것이다.

 from urllib import request
 
 with request.urlopen(“https://opensky- network.org/ api/states/ all”) as response:
 	print(response.read().decode(“utf-8”))

Listing 2.1 Querying current flight positions from OpenSky


이 코드의 출력은 항공편 위치의 거대한 목록이다.

…
 ["c00734","WJA2199","Canada",1728763865,1728763865,-70.7597,20.2082,10972.8,false,249.58,350.87,0.33,null,11658.6,nul
 l,false,0],
 ["a41b89","DAL799","United States",1728763866,1728763866,-105.9763,40.8415,10668,false,248.95,62.69,0.33,null,11102.34,null,fals
 e,0]


API는 모든 알려진 항공편의 현재 위치를 나타내는 JSON 인코딩 객체를 반환했다. 간단히 하기 위해 특정 지역으로 출력을 필터링하는 추가 매개변수를 API에 제공할 수 있다. 아래 수정된 코드는 위도와 경도 좌표를 사용하여 스위스 상공의 모든 현재 항공편을 쿼리하고, 결과를 더 쉽게 작업할 수 있는 Python 객체로 구문 분석한다.

 import json
 from urllib import request
 
 with request.urlopen(“https://opensky-network.org/api/states/all?lamin=45.8389&lomin=5.9962&lamax=47.8229&lomax=10.5226”) as response:
 	print(json.loads(response.read().decode(“utf-8”)))


아래는 요약된 출력이다.

{'time': 1728765860, 'states': [['4b1804', 'SWR560A ', 'Switzerland', 1728765859, 1728765859, 9.2725, 47.6737, 3970.02, False, 160.76, 254.41, -6.5, None, 4091.94, '1000', False, 0], ['4b1806', 'SWR2YA', 'Switzerland', 1728765773, 1728765837, 8.5633, 47.4417, 403.86, True, 0, 64.69, None, None, None, '2000', False, 0]]}

이제 관심 있는 데이터를 캡처하는 반복 가능한 코드를 갖게 되었으므로, 이 데이터를 실시간 기계 학습을 위한 개별 이벤트로 처리할 것이다.

2.1.2 실시간 이벤트 생성

이벤트는 무언가가 발생했다는 기록이다. 현재 예측 시스템의 맥락에서 이벤트는 비행기가 공항에 도착하거나 출발하는 것, 특정 지역의 날씨 업데이트와 같은 세계에서 발생한 특정한 일을 나타낼 수 있다. 그러나 이벤트는 애플리케이션 내부에서 발생하는 일을 나타낼 수도 있다. 예를 들어, 새로운 예측이 생성되면 시스템의 다른 부분에 전달되어야 한다. 이벤트 기반 시스템은 개별 이벤트의 데이터와 제어 신호를 전달함으로써 작동한다.

데이터 수집의 다음 단계는 쿼리된 비행 위치를 개별 이벤트로 변환하는 것이다. 최소한 각 이벤트는 데이터 페이로드와 이벤트가 발생한 시간을 나타내는 타임스탬프를 포함해야 한다. 시간에 민감한 시스템에서는 이벤트가 수신된 시점과 온라인 모델에 의해 최종적으로 처리되는 시점 사이에 지연이 발생하기 때문에 타임스탬프가 중요하다. 이벤트를 생성하는 것은 시스템의 다른 부분에서 해석할 수 있는 데이터 구조를 정의하는 것을 의미한다. 시작하기 위해 위치(위도와 경도), 속도, 방향(true_track), 출발 국가, 항공기의 고유 식별자(icao24)를 캡처하고자 할 수 있다. 아래 함수는 API 응답을 파싱하여 응답의 각 비행에 대한 업데이트 객체 목록을 반환한다. 응답이 시간 순서대로 보장되지 않기 때문에 time_position 타임스탬프에 따라 이벤트를 수동으로 정렬할 것이다.

이제 단일 API 응답을 일련의 이벤트로 파싱하는 함수를 갖게 되었다. 이 함수를 반복적으로 호출하여 현재 예측 시스템을 위한 실시간 데이터를 생성할 필요가 있다. 이벤트 기반 시스템 용어로 이것을 퍼블리셔(publisher)라고 한다. 퍼블리셔는 실시간 시스템의 다른 부분에 이벤트를 전달하는 장기 실행 프로세스이다. 우리의 경우, OpenSky API를 정기적으로 쿼리하고 시스템이 이해할 수 있는 이벤트를 생성하는 퍼블리셔가 필요하다.

Listing 2.3 비행 데이터로부터 이벤트 생성

#A API 응답의 모든 비행 상태를 반복한다.
#B 가장 이른 것부터 가장 늦은 것까지 정렬된 비행 이벤트 목록을 반환한다.

```python
def response_to_events(api_response):
flight_events = []
for update in api_response["states"]: #A
flight_events.append(
{
"icao24": update[0],
"origin_country": update[2],
"time_position": update[3],
"longitude": update[5],
"latitude": update[6],
"velocity": update[9],
"true_track": update[10]
}
)
return sorted(flight_events, key=lambda x: x["time_position"]) #B
```

Real-Time Machine Learning_MEAP_v01_p30

Page 1


--- Page 1 ---
데이터 API의 제약으로 인해 약간의 제한이 있다. HTTP/1.1은 풀 메커니즘이므로 API는 새로운 업데이트가 있을 때 알려주지 않는다. 유일한 방법은 주기적으로 서버에 폴링하여 항공편 업데이트를 가져오는 것이다. 퍼블리셔를 구현할 때 서버를 폴링할 간격을 결정해야 한다. 이 간격이 길수록 시스템은 실시간성이 떨어지게 되며, 오래된 데이터를 기반으로 운영하게 된다. 그러나 간격이 짧으면 API 유지 관리자가 부과한 속도 제한에 걸릴 위험이 있다. 아래 그림은 서로 다른 폴링 간격을 선택했을 때 얻을 수 있는 다양한 결과를 보여준다.

짧은 간격은 더 많은 데이터 포인트와 작은 변화를 생성하고, 긴 간격은 더 적은 데이터 포인트와 더 큰 변화를 생성한다.

Real-Time Machine Learning_MEAP_v01_p31

Page 1


--- Page 1 ---
이상적으로는 가능한 한 빨리 폴링하여 가장 실시간 데이터를 가져오는 것이 좋다. 그러나 실제로는 지속적으로 폴링하는 것이 컴퓨팅 자원을 낭비하고, 빠르게 속도 제한에 부딪히기 때문에 타협해야 한다. 특정 사용 사례에 적합한 폴링 간격을 어떻게 결정할 수 있을까? 첫 번째로 살펴볼 것은 API의 문서이다. 문서에는 기본 데이터가 얼마나 자주 업데이트되는지와 하루/시간/분당 허용되는 요청 수가 명시되어 있어야 한다. 이는 적절한 간격을 결정하는 데 좋은 출발점을 제공한다. 예를 들어, 작성 시점에 OpenSky 문서에 따르면 인증되지 않은 사용자는 10초마다 새로운 데이터를 가져올 수 있으며, 500x500 km 지역 내에서 24시간 동안 400개의 /api/states/all 요청이 허용된다. 따라서 최신 데이터를 얻기 위해 시스템을 10초마다 폴링할 수 있지만, 시스템을 하루 종일 실행하려면 폴링 간격이 24 * 60 / 400 = 3.6분 이상이어서는 안 된다. 일반적으로 30초에서 5분 사이가 좋은 출발점이다. 그러나 이는 데이터의 중요성과 데이터가 얼마나 자주 변경될 가능성이 있는지에 따라 다르다. 경보 시스템 및 금융 시장과 같은 시간에 민감한 사용 사례의 경우 폴링 간격은 1분 미만이어야 한다. 서버 상태 모니터링이나 날씨 추적과 같이 더 천천히 변하거나 안정적인 시스템의 경우 적절한 간격은 5분에 가깝다.

10초 간격으로 시작하여 폴링이 실제로 어떻게 작동하는지 확인할 것이다. 아래 코드는 정의된 간격으로 API를 쿼리하고 응답 데이터를 이벤트의 반복 가능한 형태로 변환하는 이벤트 생성기이다.

이 생성기는 Python의 for 루프에서 반복하여 이벤트를 생성할 수 있다.

이 코드를 실행한 출력은 다음과 같다.

Listing 2.4 비행 업데이트 이벤트 생성기
#A 세 번의 반복 후 실행을 중지한다.
#B yield 키워드는 이 함수를 루프의 매 반복마다 이벤트를 반환하는 생성기로 만든다.
#C API를 스팸하지 않기 위해 시간 간격 동안 대기한다.
import time
from urllib import request
def get_events(url="https://opensky-network.org/api/states/all?lamin=45.8389&lomin=5.9962&lamax=47.8229&lomax=10.5226", interval_sec=10):
for _ in range(3): #A
with request.urlopen(url) as response:
yield from response_to_events(json.loads(response.read().decode("utf-8"))) #B
time.sleep(interval_sec) #C

Listing 2.5 이벤트 생성기 호출
for event in get_events():
print(event)

Real-Time Machine Learning_MEAP_v01_p32

Page 1


--- Page 1 ---
{'icao24': '4d21ef', 'origin_country': 'Malta', 'time_position': 1729024220, 'longitude': 8.0245, 'latitude': 46.7367, 'velocity': 228.51, 'true_track': 144.81}

{'icao24': '3986e1', 'origin_country': 'France', 'time_position': 1729024220, 'longitude': 6.5155, 'latitude': 47.0584, 'velocity': 209.11, 'true_track': 210.29}

{'icao24': '4d223b', 'origin_country': 'Malta', 'time_position': 1729024220, 'longitude': 7.23, 'latitude': 47.5584, 'velocity': 219.62, 'true_track': 145.3}

출력은 연속적이지 않고 이벤트 기반이다. 트리거 이벤트는 시간 기반이며, 매 분마다 API가 호출될 때 새로운 이벤트 배치가 생성된다. 세 번의 배치 후 생성기가 반환되고 프로세스가 종료된다. 실제 프로젝트에서는 생성기가 무한히 쿼리할 것이다. 이는 프로세스가 결국 API 속도 제한에 도달할 수 있음을 의미한다. 장시간 실행되는 시스템은 이러한 오류를 감지하고 백오프(backoff) 전략을 구현해야 한다. 백오프 전략은 이와 같은 자동 쿼리 시스템이 지속적인 오류 루프에 빠지지 않도록 돕는다. 백오프 전략을 구현하는 방법은 4장에서 완전한 비행 예측 시스템을 구축할 때 논의할 것이다.

마지막으로 데이터를 저장할 필요가 있다. 오프라인 모델링을 위해 일반적으로 데이터 인스턴스를 데이터베이스와 같은 쿼리 가능한 저장 형식으로 유지한다. 현재는 각 줄이 JSON 형식으로 인코딩된 단일 비행 이벤트를 나타내는 JSON 라인 파일에 이벤트를 덤프하여 이를 시뮬레이션할 것이다. 이 시점에서 모든 기능을 FlightPublisher 클래스에 공식화할 것이다. 이는 구성된 객체로 퍼블리셔를 쉽게 시작할 수 있게 한다.

초기 비행 데이터 퍼블리셔
```python
class FlightPublisherV1:
def __init__(self, url, interval_sec, file_path):
self.url = url
self.interval_sec = interval_sec
self.file_path = file_path
def response_to_events(self, api_response): #A
flight_events = [] #A
for update in api_response["states"]: #A
flight_events.append( #A
{
"icao24": update[0], #A
"origin_country": update[2], #A
"time_position": update[3], #A
"longitude": update[5], #A
"latitude": update[6], #A
"velocity": update[9], #A
"true_track": update[10], #A
} #A
) #A
return sorted(flight_events, key=lambda x: x["time_position"]) #A
```

Real-Time Machine Learning_MEAP_v01_p33

Page 1


--- Page 1 ---
다음 코드를 실행하면 주기적으로 비행 업데이트 이벤트 배치를 flight_updates.jsonl에 덤프한다.

현재 실시간 데이터 소스에서 데이터를 추출하고, 간단한 데이터 필터링을 수행하며, 데이터 이벤트의 순서가 있는 로그를 유지하는 퍼블리셔가 있다. 이는 오프라인 학습에서 일반적인 추출-변환-적재(ETL) 파이프라인을 반영한다. 그러나 온라인 학습에서는 이 이벤트 로그를 가능한 한 실시간에 가깝게 처리하는 데 관심이 있다. 온라인 모델은 이 데이터를 구문 분석하고 실시간 출력으로 추론을 생성해야 한다. 다음 섹션에서는 실시간으로 수집된 비행 데이터를 온라인 모델링을 위해 처리하는 스트리밍 ETL 파이프라인을 생성할 것이다.

2.2 실시간 데이터 처리

현재 비행 데이터는 두 가지 상태로 존재한다. FlightPublisher가 API에서 업데이트를 읽고 처리하면서 진정한 실시간 형식으로 존재한다. 그런 다음, 업데이트를 디스크에 이벤트 로그로 유지하여 데이터가 역사적인 형식이 된다. 역사적인 형식은 디버깅에 유용하고 데이터를 이해하는 데 도움이 되지만, 온라인 모델링에는 그다지 유용하지 않다.

온라인 모델을 훈련하고 활용하기 위해서는 별도의 프로세스 간에 실시간 데이터를 통신할 수 있어야 한다. 이 섹션에서는 이벤트 기반 아키텍처의 기본 구성 요소를 배우고 온라인 모델링을 준비하기 위해 실시간 수집 구현을 완료할 것이다.

#A API 응답에서 개별 비행 업데이트를 추출한다.
#B API를 폴링하고 비행 업데이트 이벤트를 생성하는 제너레이터이다.
#C 퍼블리셔를 계속 실행하고 이벤트를 파일에 추가한다.
def get_events(self): #B
while True: #B
with request.urlopen(self.url) as response: #B
yield from self.response_to_events( #B
json.loads(response.read().decode("utf-8")) #B
) #B
time.sleep(self.interval_sec) #B
def run(self): #C
for event in self.get_events(): #C
with open(self.file_path, "a") as file: #C
file.write(json.dumps(event) + "\n") #C

Listing 2.7 비행 데이터 퍼블리셔 호출
publisher = FlightPublisherV1(url="https://opensky-​network.org/​api/states/​all?lamin=45.​
8389&lomin=5.​9962&lamax=47.​8229&lomax=10.​5226", interval_sec=60,
file_path="flight_updates.jsonl")
publisher.run()

Real-Time Machine Learning_MEAP_v01_p34

Page 1


--- Page 1 ---
2.2.1 메시지 큐

현재 FlightPublisher는 읽을 수 있는 파일에 순서대로 이벤트를 출력하고 있다. 이것은 사실 메시지 큐의 기본적인 예이다. 메시지 큐는 서로 다른 프로세스가 비동기적으로 통신할 수 있도록 하는 버퍼이다. 메시지 큐에 쓰는 프로세스를 퍼블리셔(publisher) 또는 프로듀서(producer)라고 하고, 메시지 큐를 읽는 프로세스를 구독자(subscriber) 또는 소비자(consumer)라고 한다. 메시지 큐의 장점은 퍼블리셔와 구독자가 독립적으로 작동할 수 있다는 것이다. 예를 들어, 머신 러닝에 관한 잡지를 구매하고 싶다고 가정해 보자. 서점에 가서 이번 달의 ML Digest를 직접 구매할 수 있다. 이는 양측이 조율하여 거래를 완료해야 한다. 이는 OpenSky API에 요청을 보내고 단일 응답을 받는 것과 유사하다. 다른 방법은 잡지 구독을 구매하는 것이다. 이 경우, 잡지 배급자는 매달 발행물을 우편함에 넣는 것만 책임진다. 이는 FlightPublisher가 파일 시스템에 이벤트를 기록하는 것과 유사하다.

메시지 큐를 사용하여 비행 예측 시스템의 다양한 프로세스를 분리하고자 한다. 예를 들어, 이는 FlightPublisher가 구독자가 데이터를 처리하기를 기다리지 않고 새로운 비행 데이터를 계속 수집할 수 있도록 한다. 그러나 파일 버퍼 접근 방식에는 몇 가지 문제가 있다. 하나의 문제는 파일이 전통적으로 처음부터 끝까지 순차적으로 읽힌다는 것이다. 더 많은 이벤트가 게시될수록 파일의 크기가 증가하여 파일을 읽는 데 시간이 더 오래 걸린다. 또한, 가장 최근의 이벤트는 항상 끝에 위치한다. 독자가 새로운 이벤트를 수신하려면 파일의 최근 줄로 이동해야 한다. 이는 구독자가 마지막으로 읽은 위치를 추적해야 하며, 그렇지 않으면 중복된 이벤트를 수신하게 된다. 이러한 동작은 아래 그림에 설명되어 있다.

Real-Time Machine Learning_MEAP_v01_p35

Page 1


--- Page 1 ---
그림 2.5 구독자가 이벤트 로그를 처음 읽을 때 3개의 이벤트를 읽는다. 두 번째 읽기에서는 새로운 이벤트를 읽기 전에 3개의 이벤트를 앞으로 건너뛰어야 한다. 메시지 큐가 없으면 구독자는 중복 이벤트를 받지 않기 위해 마지막으로 읽은 오프셋을 추적해야 한다.

이제 파일 버퍼 접근 방식을 실제 메시지 큐를 사용하도록 업그레이드할 것이다. Apache Kafka, Google PubSub, RabbitMQ와 같은 많은 메시지 큐 솔루션이 있다. RabbitMQ를 사용할 것이다. 왜냐하면 최소한의 코드로 로컬에 큐를 설정할 수 있기 때문이다. 메시지 브로커로서 RabbitMQ는 발행자와 구독자로부터의 연결 요청을 수락하고, 큐 생성 처리, 발행자로부터 보낸 메시지를 올바른 큐로 전달하는 것뿐만 아니라, 발행자가 연결을 닫은 후에도 큐에 정보를 지속시키는 역할을 한다. 이는 발행자와 구독자가 비동기적으로 통신할 수 있게 해주기 때문에 중요하다. 이러한 분리는 시스템이 학습하고 추론을 생성하는 동안 데이터 수집이 계속될 수 있기 때문에 실시간 머신 러닝에 유용하다.

첫 번째 단계는 RabbitMQ 관리 서비스를 실행하는 것이다. 서비스를 시작하는 가장 쉬운 방법은 Docker 이미지를 가져와 실행하는 것이다. Docker가 없다면, 여기(https://docs.docker.com/get-started/get-docker/)의 지침을 따라 시스템에 설치할 수 있다. 다음 명령어를 사용하여 RabbitMQ 서비스를 시작한다.

```bash
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0-management
```

서비스가 성공적으로 시작되었다면 시작 확인 메시지를 볼 수 있을 것이다.

```
2024-10-18 02:52:36.304715+00:00 [info] <0.651.0> Server startup complete; 4 plugins started.
2024-10-18 02:52:36.304715+00:00 [info] <0.651.0> * rabbitmq_prometheus
2024-10-18 02:52:36.304715+00:00 [info] <0.651.0> * rabbitmq_management
2024-10-18 02:52:36.304715+00:00 [info] <0.651.0> * rabbitmq_management_agent
2024-10-18 02:52:36.304715+00:00 [info] <0.651.0> * rabbitmq_web_dispatch
2024-10-18 02:52:36.409833+00:00 [info] <0.9.0> Time to start RabbitMQ: 12912 ms
```

서버와 상호작용하려면 RabbitMQ가 지원하는 메시징 프로토콜을 사용하는 클라이언트가 필요하다. 다행히도 Pika는 순수 Python으로 Advanced Message Queuing Protocol (AMQP)을 구현하는 패키지이다. AMQP는 네트워크 프로세스 간의 메시지 기반 통신을 정의하는 개방형 표준 애플리케이션 계층 프로토콜이다. 우리의 경우, 이는 Python 코드에서 Erlang으로 작성된 RabbitMQ 서버로 메시지를 보낼 수 있게 해준다. 별도의 터미널 창에서 다음 명령어를 실행하여 Pika를 설치한다.

다음으로, 큐에 메시지를 보내 작동하는지 확인할 수 있다. Pika를 사용하여 메시지를 보내기 전에 큐를 선언해야 한다. 다음 코드를 실행하면 로컬 메시지 서비스에 연결을 설정하고, flight_updates 큐에 단일 메시지를 발행하고, 연결을 닫는다.

Real-Time Machine Learning_MEAP_v01_p36

Page 1


--- Page 1 ---
다음은 코드를 실행한 결과이다.

보낸 메시지: {'icao24': 'abc123', 'origin_country': 'United States'}

Pika를 사용하여 큐에서 메시지를 소비하거나 구독할 수도 있다. 구독자 코드는 메시지로 무언가를 수행해야 하기 때문에 조금 더 복잡하다. 현재는 메시지를 출력하지만, 궁극적으로는 수신 데이터를 사용하여 온라인 추론을 수행할 것이다. 여기서는 구독자가 새 메시지를 받을 때마다 호출되는 간단한 콜백 함수(callback function)를 구성할 것이다.

데이터를 큐에 게시하기 (Listing 2.8)
#A 로컬에서 실행 중인 메시지 브로커에 연결한다.
#B 메시지 큐가 존재하는지 확인한다.
#C JSON 페이로드(payload)를 메시지 큐에 게시한다.
```python
import json
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) #A
channel = connection.channel() #A
channel.queue_declare(queue="flight_updates") #B
data = {"icao24": "abc123", "origin_country": "United States"}
channel.basic_publish(exchange="", routing_key="flight_updates", body=json.dumps(data)) #C
print("Sent message:", data)
connection.close()
```

큐에서 데이터를 소비하기 (Listing 2.9)
#A 큐에서 새 메시지를 받을 때 호출되는 함수 콜백(callback)이다.
#B 메시지 큐가 존재하는지 확인한다.
#C 소비자가 자동으로 확인 메시지를 보내도록 구성한다 (fire-and-forget 방식).
#D 큐에서 이벤트 소비를 시작한다.
```python
import json
import pika
def handle_message(channel, method, properties, body): #A
print(f"Received message: {body}") #A
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
channel.queue_declare(queue="flight_updates") #B
channel.basic_consume( #C
queue="flight_updates", on_message_callback=handle_message, auto_ack=True #C
) #C
channel.start_consuming() #D
```

© Manning Publications Co. To comment go to liveBook

Real-Time Machine Learning_MEAP_v01_p37

Page 1


--- Page 1 ---
큐를 다시 선언해야 하는 이유가 궁금할 수 있다. 이미 퍼블리셔 측에서 선언했기 때문이다. 그 이유는 프로덕션 시스템에서 퍼블리셔와 구독자가 서로 다른 기계에서 작동할 가능성이 높기 때문이다. 보조 통신 채널이 없으면 어느 쪽도 상대방이 시작했는지 알 수 없다. 다행히 queue_declare 호출은 멱등성(idempotent)을 가지며, 여러 번 호출해도 하나의 큐만 생성된다. 따라서 존재 여부를 확인하기보다는 실수로 두 번 호출하는 것이 더 쉽다. 퍼블리시 코드를 이전에 실행했다면, 이 코드는 다음 메시지를 출력해야 한다.

수신된 메시지: b'{"icao24": "abc123", "origin_country": "United States"}'

이전과 동일하므로 퍼블리셔에서 구독자로 메시지를 큐를 통해 성공적으로 전송했다. 구독자 프로세스가 대기 상태에 있는 것을 주목해야 한다. 이는 버그처럼 보일 수 있지만 실제로 의도된 동작이다. start_consuming 함수는 새로운 메시지를 기다리는 루프를 구현한다. 이는 이벤트 기반 시스템의 일반적인 패턴이다. 구독자는 시스템에서 무언가가 발생하기를 기다린 후 새로운 데이터를 생성하거나 내부 상태를 업데이트하는 등의 작업을 수행한다. 퍼블리셔 코드를 다시 실행하면 구독자가 또 다른 메시지를 출력하는 것을 확인할 수 있다.

이제 큐에서 읽는 구성 가능한 비행 구독자를 만들 수 있다. subscribe.py라는 새 파일을 만들고 다음 코드를 포함한다.

Real-Time Machine Learning_MEAP_v01_p38

Page 1


--- Page 1 ---
구독자는 표준 파이썬 프로그램처럼 실행할 수 있다.

```python
python subscriber.py
```

발행자가 실행되지 않았기 때문에 아직 출력이 보이지 않을 것이다. 다음은 큐(queue)를 사용하는 업데이트된 FlightPublisher 버전으로, publisher.py로 저장할 수 있다.

Listing 2.10 메시지 큐를 사용하는 항공편 구독자

```python
#A 항공편 업데이트를 수신할 때 호출되는 콜백 함수.
#B 메시지 큐에서 이벤트를 지속적으로 수신하는 블로킹 함수.
#C 구독자를 실행한다.
import pika

class FlightSubscriberV1:
def __init__(self, queue_name):
self.queue_name = queue_name

def process_message(self, channel, method, properties, body): #A
print(f"Received flight update: {body}") #A

def run(self): #B
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) #B
channel = connection.channel() #B
channel.queue_declare(queue=self.queue_name) #B
channel.basic_consume( #B
queue=self.queue_name, #B
on_message_callback=self.process_message, #B
auto_ack=True, #B
) #B
channel.start_consuming() #B

subscriber = FlightSubscriberV1(queue_name="flight_updates")
subscriber.run() #C
```

Listing 2.11 메시지 큐를 사용하는 항공편 발행자

```python
import json
import pika
import time
from urllib import request

class FlightPublisherV2:
def __init__(self, url, interval_sec, queue_name):
self.url = url
self.interval_sec = interval_sec
self.queue_name = queue_name

def response_to_events(self, api_response): #A
```

© Manning Publications Co. To comment go to liveBook

Real-Time Machine Learning_MEAP_v01_p39

Page 1


--- Page 1 ---
이제 두 번째 터미널 창에서 퍼블리셔를 실행한다.

API 응답을 항공편 업데이트 이벤트로 변환한다.
API를 폴링하고 항공편 업데이트 이벤트를 생성하는 제너레이터이다.
퍼블리셔를 무기한 실행하고 메시지 큐에 이벤트를 게시하는 함수이다.
시연을 위한 작은 지연이다.
퍼블리셔를 실행하는 블로킹 호출이다.

```python
flight_events = [] # API 응답을 항공편 업데이트 이벤트로 변환한다.
for update in api_response["states"]: # API 응답을 항공편 업데이트 이벤트로 변환한다.
flight_events.append( # API 응답을 항공편 업데이트 이벤트로 변환한다.
{ # API 응답을 항공편 업데이트 이벤트로 변환한다.
"icao24": update[0], # API 응답을 항공편 업데이트 이벤트로 변환한다.
"origin_country": update[2], # API 응답을 항공편 업데이트 이벤트로 변환한다.
"time_position": update[3], # API 응답을 항공편 업데이트 이벤트로 변환한다.
"longitude": update[5], # API 응답을 항공편 업데이트 이벤트로 변환한다.
"latitude": update[6], # API 응답을 항공편 업데이트 이벤트로 변환한다.
"velocity": update[9], # API 응답을 항공편 업데이트 이벤트로 변환한다.
"true_track": update[10], # API 응답을 항공편 업데이트 이벤트로 변환한다.
} # API 응답을 항공편 업데이트 이벤트로 변환한다.
) # API 응답을 항공편 업데이트 이벤트로 변환한다.
return sorted(flight_events, key=lambda x: x["time_position"]) # API 응답을 항공편 업데이트 이벤트로 변환한다.

def get_events(self): # API를 폴링하고 항공편 업데이트 이벤트를 생성하는 제너레이터이다.
while True: # API를 폴링하고 항공편 업데이트 이벤트를 생성하는 제너레이터이다.
with request.urlopen(self.url) as response: # API를 폴링하고 항공편 업데이트 이벤트를 생성하는 제너레이터이다.
yield from self.response_to_events( # API를 폴링하고 항공편 업데이트 이벤트를 생성하는 제너레이터이다.
json.loads(response.read().decode("utf-8")) # API를 폴링하고 항공편 업데이트 이벤트를 생성하는 제너레이터이다.
) # API를 폴링하고 항공편 업데이트 이벤트를 생성하는 제너레이터이다.
time.sleep(self.interval_sec) # API를 폴링하고 항공편 업데이트 이벤트를 생성하는 제너레이터이다.

def run(self): # 퍼블리셔를 무기한 실행하고 메시지 큐에 이벤트를 게시하는 함수이다.
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
channel.queue_declare(queue=self.queue_name)
for event in self.get_events():
channel.basic_publish(
exchange="", routing_key=self.queue_name, body=json.dumps(event)
)
time.sleep(0.1) # 시연을 위한 작은 지연이다.
connection.close()

publisher = FlightPublisherV2(url="https://opensky-​network.org/​api/states/​all?lamin=45.​
8389&lomin=5.​9962&lamax=47.​8229&lomax=10.​5226", interval_sec=60,
queue_name="flight_updates")
publisher.run() # 퍼블리셔를 실행하는 블로킹 호출이다.
```

© Manning Publications Co. To comment go to liveBook

Real-Time Machine Learning_MEAP_v01_p40

Page 1


--- Page 1 ---
파이썬 publisher.py

출판자와 구독자가 동일한 순서로 동일한 이벤트를 출력하는 것을 볼 수 있다. 이제 실시간 데이터를 수집하고 이를 하류 구독자에게 스트리밍하는 기본 이벤트 기반 파이프라인을 만들었다. 이는 이전에 구현한 파일 버퍼 메커니즘보다 개선된 점이다. 구독자가 새로운 데이터가 언제 사용 가능한지 추측할 필요가 없기 때문이다. 대신 구독자는 진정한 이벤트 기반으로 새로운 비행 이벤트에 즉시 반응할 수 있다.

2.2.2 메시지 큐의 문제점
메시지 큐는 실시간 머신 러닝을 위한 비동기 데이터 전송을 구현하는 간단한 방법이다. 그러나 출판자를 중지하고 구독자를 다시 시작한 후 시도해보면 어떤 일이 발생할지 생각해보라.

이 경우 다시 시작된 구독자는 실행 중이지만 이전 메시지를 받지 못한다. 구독자가 모든 이전 메시지를 출력할 것으로 예상했을 수 있다. 사실, 이전 메시지는 큐에서 제거되어 더 이상 검색할 수 없다. 이는 메시지 큐의 주요 목표가 서비스 간 비동기 통신을 가능하게 하는 것이지 데이터를 지속하는 것이 아니기 때문이다. 엄격한 실시간 사용 사례에서는 이것이 바람직하지만, 실시간 머신 러닝에서는 역사적 데이터를 쿼리할 수 있기를 원하는 상황이 있다. 예를 들어, 데이터 스트림을 재생하여 온라인 모델을 평가하거나 디버그하고 싶을 수 있다. 메시지 큐를 사용하면 스트림을 되감거나 이전 값을 쿼리할 수 없다.

지금까지 본 예제에서는 모든 메시지가 출판자에서 구독자로 성공적으로 전달되었다. 그러나 비동기 통신에서는 메시지가 누락될 가능성이 항상 있다. 구독자가 메시지를 수신하려고 시도하는 동안 브로커와의 연결이 끊어져 출판자가 보낸 메시지를 받지 못할 수 있다. RabbitMQ는 큐에 대한 게시를 트랜잭션으로 만들어 구독자에게 전달되지 않은 메시지를 다시 보내는 책임을 출판자에게 부여하여 이를 해결한다.

이는 최소 한 번 전달의 예이다. 최소 한 번 전달은 구독자가 결국 이벤트를 수신하게 하지만 중복이 발생할 수 있다. 반면, 최대 한 번 전달은 중복이 수신되지 않도록 보장하지만 이 경우 메시지가 손실될 가능성이 있다. 이 두 가지 의미의 차이는 아래 그림에 설명되어 있다.

Real-Time Machine Learning_MEAP_v01_p41

Page 1


--- Page 1 ---
그림 2.6 최소 한 번 전달은 모든 메시지가 적어도 한 번 전달되도록 보장하지만 중복이 발생할 수 있다. 최대 한 번 전달은 중복을 방지하지만 메시지 전달에 실패할 수 있다.

비동기 메시징의 이상적인 의미론은 정확히 한 번 전달이지만, 이는 매우 특정한 시나리오를 제외하고는 달성할 수 없다. 이는 네트워크 통신의 제약을 설명하는 유명한 두 장군의 사고 실험과 관련이 있다. 이 사고 실험은 E. A. Akkoyunlu, K. Ekanadham, R. V. Huber가 1975년에 발표한 논문에서 처음 소개되었다. 이 시나리오는 신뢰할 수 없는 링크를 통해 통신하는 두 당사자가 행동을 조정하는 것이 불가능하다는 것을 보여준다. 구독자가 메시지가 제대로 수신되었음을 게시자에게 확인 메시지를 보내더라도, 그 확인 메시지 자체가 손실되지 않는다는 보장은 없다.

현실적으로, 엔지니어들은 중복 데이터 처리와 데이터 누락 중 하나를 선택해야 한다. 실시간 머신러닝에서는 정확한 사용 사례에 따라 달라진다. 금융 사기 탐지 시스템에서는 중요한 거래를 놓치는 것이 모델의 예측 정확성에 영향을 미친다면 최소 한 번 전달이 선호된다. 사용자 추천과 같은 덜 중요한 시나리오에서는 사용자에게 추천이 전달되는 속도가 가끔 데이터 포인트를 잃는 것보다 더 중요하다면 최대 한 번 전달이 허용된다. 비행 예측 시스템에서는 이벤트 누락이 바람직하지 않다. 이는 온라인 모델링을 위한 데이터가 적어지고 정확한 예측을 하고자 하기 때문이다. 다행히도 항공기 ID와 타임스탬프가 고유 ID를 형성하므로 중복을 감지하는 것은 어렵지 않다. 다음 함수는 FlightSubscriber에 추가하여 현재 비행 위치를 기록하고 이벤트가 이미 처리되었는지 확인할 수 있다.

Real-Time Machine Learning_MEAP_v01_p42

Page 1


--- Page 1 ---
메시지 큐는 임시 버퍼로 사용되기 때문에 크기가 제한되어 있다. 구독자가 발행자보다 느리게 작동하면 큐의 메시지 수가 계속 증가한다. 이러한 현상을 역압(backpressure)이라고 하며, 이는 데이터 처리량이 구독자의 처리 속도에 의해 제한되는 것을 의미한다. 이는 러시아워 동안 고속도로에 너무 많은 차가 있어 교통 속도가 느려지는 것과 유사하다.

역압을 관리하는 방법에는 몇 가지가 있다. 한 가지 방법은 발행자의 속도를 의도적으로 늦추는 것이다. 실시간 기계 학습에서는 이 옵션이 바람직하지 않을 수 있는데, 이는 더 이상 관련성이 없는 오래된 데이터를 수집하게 되기 때문이다. 또 다른 방법은 큐에 대한 압력을 줄이기 위해 메시지를 삭제하는 것이며, 이는 데이터 손실로 이어진다. 높은 처리량의 데이터 수집을 처리할 때 가장 좋은 방법은 구독자의 수를 늘리는 것이다. 물론, 이는 시스템의 복잡성을 증가시킨다.

같은 큐에서 여러 구독자가 읽도록 하는 기능적 이유가 있다. 예를 들어, 온라인 모델이 생성한 추론은 여러 사용자 인터페이스 애플리케이션에서 읽을 수 있다. 그러나 메시지는 소비되면 큐에서 제거되므로 두 개의 다른 구독자가 큐에서 동일한 이벤트를 받을 수 없다. 이러한 동작을 구현하기 위해서는 실시간 버퍼 메커니즘이 필요하며, 이는 역사적 로그로도 지속될 수 있어야 한다: 이벤트 스트림(event stream)이다.

중복 메시지 감지
#A 이 항공기에 대한 최신 이벤트가 아니라면 중복이다.
#B 이 항공기에 대한 최신 이벤트를 캐시에 저장한다.
def check_duplicate(self, event):
if (
event["icao24"] in self.flights
and event["time_position"] <= self.flights[event["icao24"]]["time_position"]
):
return True
self.flights[event["icao24"]] = event
return False

Real-Time Machine Learning_MEAP_v01_p43

Page 1


--- Page 1 ---
2.2.3 이벤트 스트림

이벤트 스트림은 실시간으로 소비 가능하고, 과거 데이터를 조회할 수 있도록 설계되었다. 실질적으로 이벤트 스트림은 순서가 있는, 추가만 가능한 이벤트 로그로 존재한다. 이는 여러 발행자와 구독자 간의 비동기 통신을 위한 유용한 데이터 구조이다. 이러한 방식으로 이벤트 스트림은 이벤트 데이터를 디스크에 저장함으로써 큐의 많은 제한 사항을 해결한다. 이벤트 스트림은 여러 발행자와 구독자 간의 이벤트 통신을 관리하는 이벤트 브로커(event brokers)라는 특수한 프로세스를 필요로 한다.

RabbitMQ 브로커는 이미 지속적인 데이터 스트림을 지원한다. 큐를 생성할 때 x-queue-type=stream 옵션을 추가하고, durable=True로 설정하여 브로커의 재부팅 시에도 스트림이 지속되도록 이벤트 스트림을 정의할 수 있다. 여기에는 새로운 이벤트 스트림에 비행 업데이트를 기록하는 업데이트된 발행자가 있다. flight_updates가 이미 durable=False로 정의되어 있기 때문에 브로커는 동일한 이름을 사용할 수 없도록 한다. 대신, flight_events라는 새로운 스트림을 생성할 것이다.

Listing 2.13 이벤트 스트림을 사용하는 비행 발행자

```python
import json
import pika
import time
from urllib import request

class FlightPublisherV3:
def __init__(self, url, interval_sec, stream_name):
self.url = url
self.interval_sec = interval_sec
self.stream_name = stream_name

def response_to_events(self, api_response):
flight_events = []
for update in api_response["states"]:
flight_events.append(
{
"icao24": update[0],
"origin_country": update[2],
"time_position": update[3],
"longitude": update[5],
"latitude": update[6],
"velocity": update[9],
"true_track": update[10],
}
)
return sorted(flight_events, key=lambda x: x["time_position"])

def get_events(self):
while True:
with request.urlopen(self.url) as response:
yield from self.response_to_events(response.read())
```

Manning Publications Co.의 저작권 ©. 의견을 남기려면 liveBook으로 이동하십시오.

Real-Time Machine Learning_MEAP_v01_p44

Page 1


--- Page 1 ---
`publisher.py`를 다시 실행하면 이전과 유사한 출력이 생성된다. 그러나 브로커 터미널에서 다른 출력을 볼 수 있으며, 이는 이벤트가 실제로 디스크에 저장되고 있음을 의미한다.

2024-10-19 15:41:28.879707+00:00 [info] <0.4238.0> rabbit_stream_coordinator: rabbit@6bcd5c37b84b에서 __flight_events_1729352488363061228의 writer가 1에서 시작되었다고 한다.

2024-10-19 15:41:28.879955+00:00 [info] <0.4239.0> Stream: __flight_events_1729352488363061228이 osiris 로그 데이터 디렉토리로 /var/lib/rabbitmq/mnesia/rabbit@6bcd5c37b84b/stream/__flight_events_1729352488363061228을 사용할 것이라고 한다.

2024-10-19 15:41:28.910821+00:00 [info] <0.4239.0> osiris_writer:init/1: 이름: __flight_events_1729352488363061228 마지막 오프셋: -1 커밋된 청크 ID: -1 에포크: 1이라고 한다.

퍼블리셔를 지금 중지한다. 이벤트가 게시되었는지 확인하기 위해 새로운 구독자가 이를 검색할 수 있는지 테스트할 것이다. 또한 구독자 동작을 구성할 몇 가지 옵션이 있다.

첫 번째는 구독자 확인이다. 이벤트 기반 시스템에서 브로커는 이벤트가 구독자에게 성공적으로 전달되었는지 알아야 하며, 이를 통해 이벤트를 발신 대기열에서 제거할 수 있다. 구독자는 이벤트가 성공적으로 처리되면 즉시 확인 신호를 보내야 한다. 큐를 사용할 때 이전에 auto_ack=True로 설정했다. 이는 브로커가 이벤트를 구독자에게 전달하자마자 삭제하는 가장 높은 처리량 동작을 초래했다(“발사 후 잊기” 방법).

#A API 응답을 항공편 이벤트로 변환한다.
#B API를 폴링하고 항공편 이벤트를 생성하는 제너레이터이다.
#C 브로커의 재부팅 시에도 지속되는 이벤트 스트림을 생성한다.
#D JSON 이벤트를 이벤트 스트림에 게시한다.
#E 퍼블리셔를 실행하여 이벤트 게시를 시작한다.

```python
json.loads(response.read().decode("utf-8")) #B
) #B
def run(self):
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
channel.queue_declare(
queue=self.stream_name,
durable=True, #C
arguments={"x-queue-type": "stream"}
)
for event in self.get_events():
print("Sending flight update:", event)
channel.basic_publish( #D
exchange="", routing_key=self.stream_name, body=json.dumps(event) #D
) #D
connection.close()
publisher = FlightPublisherV3(url="https://opensky-​network.org/​api/states/​all?lamin=45.​
8389&lomin=5.​9962&lamax=47.​8229&lomax=10.​5226", interval_sec=60,
stream_name="flight_events")
publisher.run() #E
```

39 © Manning Publications Co. To comment go to liveBook

Real-Time Machine Learning_MEAP_v01_p45

Page 1


--- Page 1 ---
그러나 구독자가 이벤트를 처리할 수 없는 여러 시나리오가 있다. 예를 들어, JSON 이벤트가 정의된 스키마에 대해 유효성 검사를 실패할 수 있다(예: 필수 키가 누락된 경우). 또한, 구독자가 처리할 수 없는 사전 조건 실패로 인해 이벤트를 수신할 수도 있다(예: 구독자가 로컬 데이터베이스를 업데이트해야 하지만 데이터베이스가 아직 준비되지 않은 경우). 이러한 경우, 구독자는 브로커에게 이벤트를 삭제하거나 다시 큐에 넣어야 한다고 알리는 부정적 승인(negative acknowledgement)을 보내야 한다. 따라서 auto_ack=False로 설정하고 구독자가 수동 승인을 보내도록 변경하는 것이 더 안전하다.

다른 옵션은 처리량과 관련이 있다. RabbitMQ 소비자는 스트림의 역압(backpressure)을 완화하기 위해 구독자에서 이벤트를 처리하기 전에 미리 가져온다(prefetch). 무제한 미리 가져오기는 가장 높은 처리량을 허용하지만 구독자를 압도할 위험이 있다. 구독자가 이벤트를 승인하지 못하면 미리 가져온 이벤트가 구독자에 쌓이게 된다.

그림 2.8 구독자에서 이벤트를 미리 가져와 데이터 처리량 증가

미리 가져오기 수(prefetch count)는 일반적으로 처리량과 데이터 안전 요구 사항에 따라 실험적으로 결정된다. 우리의 경우, 60초마다 100개 미만의 이벤트만 수집하므로 높은 처리량이 필요하지 않다. 가장 보수적인 값인 1로 이 값을 설정할 것이다. 여기 새로운 이벤트 스트림에서 소비하고 데이터 안전성을 보장하기 위해 중복 검사를 통합한 업데이트된 FlightSubscriber가 있다.

Listing 2.14 이벤트 스트림을 사용하는 Flights 구독자
```python
import json
import pika
class FlightSubscriberV2:
def __init__(self, stream_name):
self.stream_name = stream_name
self.flights = {}
def check_duplicate(self, event):
if (
event["icao24"] in self.flights
and event["time_position"] <= self.flights[event["icao24"]]["time_position"]
):
return True
```

Real-Time Machine Learning_MEAP_v01_p46

Page 1


--- Page 1 ---
새로운 구독자를 실행해 보아라. 놀랍게도, 발행된 이벤트를 전혀 받지 못한다. 이제 발행자를 다시 실행해 보아라.

비행 업데이트를 받았다: {'icao24': '4ba975', 'origin_country': 'Turkey', 'time_position': 1729359358, 'longitude': 6.1143, 'latitude': 46.2395, 'velocity': 8.23, 'true_track': 45}

이번에는 구독자가 데이터를 받았지만, 단 하나의 이벤트만 받았다. prefetch_count를 2로 설정하고 구독자를 다시 실행해 보아라. 그런 다음, 발행자를 재시작하여 새로운 이벤트를 생성해 보아라.

비행 업데이트를 받았다: {'icao24': '4a3121', 'origin_country': 'Romania', 'time_position': 1729359386, 'longitude': 9.803, 'latitude': 47.734, 'velocity': 171.92, 'true_track': 268.11}

비행 업데이트를 받았다: {'icao24': '398579', 'origin_country': 'France', 'time_position': 1729359559, 'longitude': 9.7242, 'latitude': 46.6355, 'velocity': 217.26, 'true_track': 125.46}

이제 구독자는 두 개의 새로운 이벤트만 받았다. 무슨 일이 일어나고 있는지 이해하기 위해 이벤트의 순서를 고려해 보아라.

1. 발행자가 시작하여 브로커에 몇 가지 이벤트를 발행한다.
#A 중복 이벤트를 확인하기 위한 함수 콜백.
#B 이벤트를 소비할 때 최대 1개의 이벤트를 미리 가져온다.
#C 중복을 확인하기 위해 콜백을 구성한다.
#D 자동 확인을 비활성화한다.
#E 구독자를 실행하여 스트림에서 이벤트 소비를 시작한다.
self.flights[event["icao24"]] = event
return False
def process_message(self, channel, method, properties, body): #A
event = json.loads(body) #A
if not self.check_duplicate(event): #A
print(f"Received flight update: {event}") #A
def run(self):
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
channel.queue_declare(
queue=self.stream_name,
durable=True,
arguments={"x-queue-type": "stream"}
)
channel.basic_qos(prefetch_count=1) #B
channel.basic_consume(
queue=self.stream_name,
on_message_callback=self.process_message, #C
auto_ack=False #D
)
channel.start_consuming()
subscriber = FlightSubscriberV2(stream_name="flight_events")
subscriber.run() #E

Real-Time Machine Learning_MEAP_v01_p47

Page 1


--- Page 1 ---
2. 구독자는 prefetch_count=1로 시작하고 이벤트를 받지 않는다.
3. 발행자가 재시작하고 브로커에 더 많은 이벤트를 발행한다. 구독자는 하나의 이벤트를 받는다.
4. 구독자는 prefetch_count=2로 재시작하고 이벤트를 받지 않는다.
5. 발행자가 재시작하고 브로커에 더 많은 이벤트를 발행한다. 구독자는 2개의 이벤트를 받는다.
여기서 구독자는 발행자가 재시작할 때만 새로운 이벤트를 소비한다. 이는 큐와 달리, 브로커에 이전에 발행된 메시지가 새로운 구독자에게 자동으로 전달되지 않기 때문이다. 대신, 새로운 구독자는 마지막으로 발행된 오프셋에서 스트림을 읽기 시작한다. 구독자가 이전 이벤트를 읽도록 하려면 x-stream-offset 옵션을 다른 값으로 설정해야 한다. 예를 들어, 구독자를 스트림의 시작부터 시작하도록 설정할 수 있다.

두 번째 질문은 왜 구독자가 3단계에서 하나의 이벤트만 받고 5단계에서 두 개의 이벤트를 받는가이다. 그 이유는 구독자가 이벤트를 인식하도록 설정하지 않았기 때문에 prefetch 큐가 가득 차기 때문이다. 스트림의 나머지를 소비하려면 구독자가 각 이벤트가 제대로 처리된 후 브로커에 인식해야 한다. 아래의 목록에 표시된 대로이다.

이 업데이트를 한 후, 구독자를 재시작하라. 이제 구독자가 모든 이벤트를 소비하는 것을 볼 수 있다. 모든 이벤트가 수신되었는지 확인하려면 마지막 몇 개의 이벤트를 확인하라. 이는 마지막 발행자 실행과 동일해야 한다. 또한 구독자 출력에서 이전과 동일한 이벤트를 소비하는지 확인할 수 있다.

Listing 2.15 스트림의 시작부터 소비하기
#A 이벤트를 소비할 때 첫 번째 오프셋에서 시작한다.
```python
channel.basic_consume(
queue=self.stream_name,
on_message_callback=self.process_message,
arguments={"x-stream-offset": "first"} #A
)
channel.start_consuming()
```

Listing 2.16 구독자로부터의 인식 전송
#A 이벤트가 성공적으로 처리되었음을 나타내는 이벤트 인식.
```python
def process_message(self, channel, method, properties, body):
event = json.loads(body)
if not self.check_duplicate(event):
print(f"Received flight update: {event}")
channel.basic_ack(delivery_tag=method.delivery_tag) #A
```

42
© Manning Publications Co. To comment go to liveBook

Real-Time Machine Learning_MEAP_v01_p48

Page 1


--- Page 1 ---
비행 업데이트 수신: {'icao24': '4a3121', 'origin_country': 'Romania', 'time_position': 1729359386, 'longitude': 9.803, 'latitude': 47.734, 'velocity': 171.92, 'true_track': 268.11}

비행 업데이트 수신: {'icao24': '4b18fc', 'origin_country': 'Switzerland', 'time_position': 1729359430, 'longitude': 8.5584, 'latitude': 47.4614, 'velocity': 0, 'true_track': 275.62}

비행 업데이트 수신: {'icao24': '70c0cd', 'origin_country': 'Oman', 'time_position': 1729359447, 'longitude': 8.5553, 'latitude': 47.4608, 'velocity': 0, 'true_track': 5.62}

비행 업데이트 수신: {'icao24': '398579', 'origin_country': 'France', 'time_position': 1729359559, 'longitude': 9.7242, 'latitude': 46.6355, 'velocity': 217.26, 'true_track': 125.46}

이벤트 스트림의 강력한 점 중 하나는 여러 발행자와 구독자로 확장할 수 있다는 것이다. 새로운 터미널 창을 열고 구독자의 새 복사본을 시작한 후 발행자를 다시 시작하여 이를 확인할 수 있다. 60초 후 발행자는 새로운 이벤트 시리즈를 생성하고 각 구독자는 동일한 출력을 출력하여 둘 다 새로운 이벤트를 소비했음을 나타낸다. 기본 메시지 큐에서는 이벤트가 소비되면 큐에서 제거되기 때문에 이는 불가능했다.

이벤트 기반 시스템은 다양한 이벤트 스트림을 통해 통신하는 많은 발행자와 구독자로 구성된다. 이벤트 스트림 자체는 때때로 주제(토픽)라고도 한다. 지금까지 실시간 비행 데이터를 캡처하는 단일 주제를 다루어 왔다. 그러나 실시간 머신러닝 시스템에서는 특정 데이터 클래스에 전념하는 여러 주제가 있을 것이다. 다음 섹션에서는 실시간 머신러닝 시스템을 구축하기 위해 주제 측면에서 데이터 워크플로를 구성하는 방법을 탐구할 것이다.

2.3 머신러닝을 위한 이벤트 기반 아키텍처의 이점 이제 데이터 수집 파이프라인을 구축하기 위해 이벤트 기반 아키텍처를 사용하는 방법을 설명했으므로, 머신러닝 애플리케이션을 구축하는 데 특히 적합한 이유에 대해 간단히 설명하겠다.

일반적으로 머신러닝 제품을 구축하는 접근 방식은 데이터 배치를 통해 모델을 훈련한 다음, 모델을 프로덕션 소프트웨어 애플리케이션에 통합하여 추론을 수행하는 것이다. 추론을 생성하는 방법에는 여러 가지가 있다. 한 가지 방법은 훈련 과정과 유사하게 배치 방식으로 오프라인에서 추론을 사전 계산하는 것이다. 즉, 정적이고 유한한 데이터 세트에서 추론을 생성하는 것이다. 이러한 상황에서는 시기적절함이 보통 중요한 요소가 아니다. 예로는 주택 가격 예측, 고객 생애 가치, 소매점의 상품 수요 예측 등이 있다.

그러나 시간이 지남에 따라 더 많은 활동이 온라인에서 이루어지고 있다. 불과 몇십 년 전만 해도 CD로 음악을 듣고 영화와 TV 프로그램의 DVD를 대여하거나 구매했다. 네트워크와 케이블 TV가 프로그램의 일정을 통제했고, 많은 금융 거래가 대면으로 이루어졌다. 오늘날 우리는 온라인으로 음악을 듣고 콘텐츠를 시청한다. Netflix와 같은 서비스에서 프로그램을 시청할 시간을 우리가 통제한다. 우리는 온라인으로 더 많은 금융 거래를 수행하고 Uber와 같은 차량 공유 앱을 호출하며 Doordash와 같은 온라인 배달 서비스를 통해 음식을 주문한다.

Real-Time Machine Learning_MEAP_v01_p49

Page 1


--- Page 1 ---
위 내용은 서비스 제공의 적시성에 대한 필요성이 증가하고 있음을 강조한다. 이는 실시간 애플리케이션에서 중요한 구성 요소인 기계 학습(machine learning)에도 영향을 미친다. 실시간으로 추론을 생성하는 것이 필수적인 많은 사용 사례가 있다. 이전 장에서 몇 가지 예를 다루었다. 혜택을 논의하기 전에, 현재 기계 학습 애플리케이션이 어떻게 배포되는지에 대해 이야기하겠다.

2.3.1 동기식 기계 학습
기계 학습 애플리케이션은 특수한 유형의 소프트웨어 애플리케이션이므로, 전통적인 소프트웨어 애플리케이션과 동일한 프레임워크에 기계 학습을 포함시키는 것이 합리적이다. 이는 일반적으로 기계 학습 모델과 새로운 데이터에 대한 추론을 생성하는 데 사용되는 코드인 추론 엔진이 마이크로서비스의 구성 요소로 제공된다는 것을 의미한다. 종종 이 마이크로서비스는 REST 애플리케이션의 일부이다. 이는 하나의 마이크로서비스가 기계 학습 마이크로서비스에 특징을 제공하여 요청을 하고, 기계 학습 마이크로서비스가 모델을 로드하고 추론을 생성하여 응답 형태로 추론을 반환하는 것을 포함한다. 그림 2.9는 이러한 상호작용의 예를 보여준다. 이 예에서는 고객이 레스토랑에 대한 리뷰를 게시할 수 있는 웹 애플리케이션이 있다. 리뷰는 백엔드 서비스로 전송되어 고객 리뷰 감정 예측기로 라우팅되며, 이는 리뷰의 감정(긍정적 또는 부정적)을 생성할 모델을 로드하고 이를 백엔드 서비스에 응답으로 반환하는 마이크로서비스이다. 응답을 받은 후, 리뷰와 감정은 고객 서비스 담당자가 고객 리뷰를 분류하는 데 사용하는 다른 애플리케이션인 고객 지원 포털로 전달된다.

Real-Time Machine Learning_MEAP_v01_p50

Page 1


--- Page 1 ---
그림 2.9 고객 리뷰 감성 분석 애플리케이션의 동기식 아키텍처 사용

위의 상호작용의 본질은 동기식이다. 요청을 제출하는 서비스는 응답을 받을 때까지 기다린 후 애플리케이션 워크플로의 다음 단계로 진행한다. 서비스가 응답을 받을 때까지 기다려야 한다는 사실은 애플리케이션에 약간의 지연을 초래하며, 이는 일부 사용 사례에서는 용납되지 않을 수 있다.

지연은 다른 방식으로도 시스템에 들어올 수 있다. 예측 요청이 예측 엔진에 과도하게 몰리면, 처리할 수 있는 양을 초과하여 역압(backpressure)을 발생시켜 응답이 느려진다. 일부 애플리케이션 설계자는 서버 부하를 관리하기 위해 요청 유입을 관리하는 속도 제한(rate limiting)을 적용할 수 있지만, 이는 서비스가 요청을 단순히 거부하게 된다는 것을 의미한다. 어느 상황도 이상적이지 않으며, 사용자 경험이 저하될 수 있다.

Real-Time Machine Learning_MEAP_v01_p51

Page 1


--- Page 1 ---
그림 2.10 고객 리뷰 감성 분석 애플리케이션의 동기식 아키텍처 사용

위의 내용은 특징을 전송하는 서비스와 추론을 생성하고 반환하는 서비스 간에 긴밀한 결합이 있음을 강조한다. 머신러닝 마이크로서비스의 부하는 얼마나 많은 요청이 전송되는지에 따라 영향을 받을 수 있으며, 요청하는 마이크로서비스의 성능은 머신러닝 마이크로서비스의 응답성에 의존한다.

이전에 제시된 고객 리뷰 애플리케이션은 머신러닝 마이크로서비스를 호출하는 애플리케이션이 추론을 받아 다른 애플리케이션에 전달하여 고객 서비스 담당자가 리뷰를 분류하는 시나리오를 보여준다. 만약 여러 애플리케이션에서 추론이 필요하다면 어떻게 될까? 예를 들어, 추론은 비즈니스 인텔리전스 목적으로 사용되는 분석을 표시하는 대시보드의 일부일 수 있다. 추론과 특징은 오프라인 분석을 위해 데이터베이스에 저장될 수도 있다. 워크플로우의 어느 부분에서 병목 현상이 발생하면, 그 영향은 전체 시스템이 실패하는 연쇄 반응을 일으킬 수 있다.

이 의존성에는 다른 문제도 있다. 머신러닝 마이크로서비스가 API 변경을 해야 한다면, 이에 의존하는 모든 다른 마이크로서비스도 코드를 변경해야 한다. 많은 조직에서 머신러닝 마이크로서비스를 개발한 팀은 이를 호출하는 팀과 다르다. 이들 팀은 일반적으로 다른 목표, 우선순위, 출시 일정을 가지고 있다. 그 결과는 많은 조직에서 발생하는 의사소통 문제로, 머신러닝 프로젝트가 실패할 수 있다.

Real-Time Machine Learning_MEAP_v01_p52

Page 1


--- Page 1 ---
이 모든 것은 중요한 점을 강조한다: 반응형 애플리케이션을 구축하기 위해서는 데이터에 대한 빠른 접근이 핵심이다. 데이터는 실시간 시스템에서 돈과 유사한 가치를 가진다. 오늘의 1달러가 미래의 1달러보다 더 가치가 있는 것처럼, 실시간 머신 러닝 애플리케이션에서 데이터가 더 빨리 이용 가능하고 처리되어 실행될수록 조직에 더 많은 가치를 제공한다. 예를 들어, 사용자 활동에 기반하여 실시간으로 추천을 조정할 수 있는 애플리케이션은 전날의 데이터로 학습되어 조정할 수 없는 애플리케이션보다 사용자를 더 잘 유지하고 다시 방문하게 할 가능성이 높다. 마찬가지로, 애플리케이션이 사기성 금융 거래를 더 빨리 감지하고 표시할 수 있을수록 금융 기관이 시간이 지남에 따라 더 많은 돈을 절약할 가능성이 높다.

2.3.2 이벤트 기반 머신 러닝
이 장의 앞부분에서 데이터 신선도 문제에 대한 해결책으로 실시간 데이터 흐름과 이벤트 기반 아키텍처의 개념을 소개했다. 그림 2.11은 이벤트 기반 아키텍처를 사용하는 애플리케이션의 예를 보여준다. 고객 리뷰 애플리케이션이 고객 리뷰 데이터를 고객 리뷰 주제로 전달한다. 머신 러닝 모델을 포함하는 고객 리뷰 감정 예측 애플리케이션은 새로운 고객 리뷰가 고객 리뷰 주제에 추가되면 즉시 추론을 생성하고 이를 고객 감정 주제에 게시한다. 고객 리뷰 감정 예측 애플리케이션은 하나의 주제를 구독하고(데이터를 수신) 다른 주제에 게시하는 "핸들러" 애플리케이션으로 간주할 수 있다.

그림 2.11 이벤트 기반 아키텍처를 사용하는 고객 리뷰 감정 분석 애플리케이션

Real-Time Machine Learning_MEAP_v01_p53

Page 1


--- Page 1 ---
이전 예시와의 주요 차이점은 고객 리뷰 데이터를 수집하는 고객 리뷰 애플리케이션이 고객 리뷰 감정 분석 애플리케이션과 독립적으로 작동한다는 것이다. 이 애플리케이션은 단순히 데이터를 발행하는 역할을 한다. 감정 분석 애플리케이션은 고객 리뷰 주제에 구독하고 주제에 저장된 데이터를 처리하여 고객 감정 주제에 추론 결과를 생성하고 발행하는 책임이 있다. 고객 리뷰 애플리케이션은 더 이상 추론 결과를 수집하고 모든 후속 작업을 수행할 필요가 없기 때문에 감정 분석 애플리케이션의 내부를 알 필요가 없다. 사실, 이 다이어그램은 다른 애플리케이션이 후속 작업을 처리하도록 설정할 수 있는 방법도 보여준다. 이 예시에서는 감정 분석 애플리케이션이 데이터를 발행하는 고객 감정 주제에 구독된 두 개의 애플리케이션이 있다. 하나는 고객 지원 포털을 운영하여 고객 지원 인력이 리뷰를 분류할 수 있도록 한다. 두 번째 애플리케이션은 고객 리뷰와 모델 출력을 분석하여 비즈니스 인텔리전스 목적으로 사용할 수 있는 분석 포털을 생성한다. 이 시나리오에서 다른 두 애플리케이션은 감정 분석 애플리케이션과 직접 대화하지 않는다.

이 아키텍처의 또 다른 이점은 주제의 구독자가 원하는 간격으로 데이터를 소비할 수 있다는 것이다. 따라서 구독자는 SLA가 즉각적인 처리를 요구하는 경우 실시간 이벤트를 선택할 수 있으며, 실시간으로 이벤트를 수집할 필요가 없는 경우 나중에 이벤트를 수집할 수 있다.

이벤트 기반 아키텍처는 애플리케이션 유지보수를 개선한다. 이러한 애플리케이션이 분리되어 있기 때문에 각 팀은 자체 유지보수 주기를 선택할 수 있으며, 코드 변경이 다른 팀에 영향을 미칠 걱정을 할 필요가 없다.

이벤트 기반 애플리케이션은 데이터 접근의 민주화를 통해 데이터 사일로 문제를 해결한다. 동기적 패러다임에서는 소비 애플리케이션이 데이터를 생성하는 애플리케이션의 API 사양을 따라야 한다. 그러나 애플리케이션이 데이터를 수신하기 위해 주제에 구독만 하면 되고 데이터 생성자를 호출하는 방법에 대해 걱정할 필요가 없다면, 애플리케이션 설계에 훨씬 더 많은 유연성이 생기고 소비 애플리케이션은 데이터 생성자와 다른 프로그래밍 언어로 작성될 수 있다. 이는 조직 내 여러 팀이 각자의 배경에 따라 데이터를 활용할 수 있는 가능성을 열어주며, 이는 조직에 더 많은 가치 흐름을 창출한다.

이벤트 기반 아키텍처는 데이터 접근을 방해하고 머신러닝 애플리케이션 개발을 지연시키는 또 다른 문제를 해결한다: 경험이 부족한 팀원이 데이터를 실수로 삭제할 수 있다는 두려움이다. 전통적인 관계형 데이터베이스와 달리, 데이터는 업데이트되거나 삭제될 수 없으며, 이벤트는 불변이며 단순히 주제에 추가된다. 이는 데이터에 대한 읽기 전용 접근을 제공하는 것과 유사하다. 소비 애플리케이션은 사용 사례에 따라 이벤트를 처리해야 한다. 또한, 특정 데이터 요소가 민감하고 제한이 필요한 경우, 민감한 요소를 마스킹하거나 제거하고 이벤트를 다른 주제에 발행하여 조직 내 누구나 사용할 수 있도록 하는 애플리케이션을 생성할 수 있다. 반면, 관계형 데이터베이스에서는 지속적으로 업데이트해야 하는 복잡한 행/열 기반 데이터베이스 접근 정책이 필요하거나, 여러 사용자를 위해 동일한 데이터 소스의 여러 복사본을 생성하고 유지보수해야 한다.

Real-Time Machine Learning_MEAP_v01_p54

Page 1


--- Page 1 ---
이벤트 기반 아키텍처가 애플리케이션 유지보수를 어떻게 더 잘 지원하는지 간단히 언급하였다. 이제 이를 머신러닝의 맥락에서 확장하여 설명할 것이다. 머신러닝 애플리케이션을 유지보수하는 것은 소프트웨어 애플리케이션을 유지보수하는 것보다 훨씬 복잡하다. 예를 들어, 모델 결과의 재현성은 머신러닝 모델을 관리하는 데 중요한 측면이다. 데이터는 일반적으로 최신 스냅샷만 저장하는 관계형 데이터베이스에 저장된다. 이는 머신러닝 모델을 훈련하는 데 사용된 데이터를 캡처하고 버전 관리하는 솔루션을 설계하는 것이 간단한 작업이 아님을 의미한다. 이벤트 스트리밍 플랫폼에서는 이벤트 내의 데이터가 불변이며 이러한 이벤트는 완전히 순서가 정해져 있다. 이는 애플리케이션이 모델 훈련에 사용된 데이터의 시작과 끝 오프셋(offset)만 저장하면 된다는 것을 의미한다. 아래의 그림 2.12는 모델의 v1이 오프셋 0에서 5까지의 데이터로 훈련되었고, v2 버전의 모델이 오프셋 6에서 9까지의 데이터로 훈련된 예를 보여준다.

그림 2.12 오프셋을 사용하여 모델 훈련 추적

이 기능의 하위 이점은 서로 다른 모델 버전을 비교하는 것이 훨씬 간단해진다는 것이다. 데이터가 항상 사용 가능하기 때문에 사후 분석이 필요할 때 더 쉽게 수행할 수 있다. 예로는 모델 매개변수, 구성, 평가 지표, 데이터 분포, 특징과 목표 간의 관계, 특징 중요도 등을 비교하는 것이 포함된다. 한 걸음 더 나아가 훈련 데이터와 추론에 사용된 데이터를 비교하여 운영 중 문제를 해결할 수 있다. 실제로 훈련에 사용된 특징과 추론에 사용된 특징이 동일한 토픽에 저장되면, 동일한 코드를 사용하여 훈련 및 추론 데이터를 수집할 수 있어 머신러닝 애플리케이션을 배포하는 데 걸리는 시간이 줄어든다.

CPU 및 메모리 사용량과 같은 애플리케이션 상태 메트릭을 캡처하고 분석하는 것도 관련 토픽에 이벤트로 게시될 때 더 쉬워진다. 이는 예기치 않은 문제가 발생할 때 실시간 경고를 받을 수 있는 가능성을 열어준다.

Real-Time Machine Learning_MEAP_v01_p55

Page 1


--- Page 1 ---
기계 학습 응용 프로그램의 또 다른 구성 요소는 모델 평가와 재훈련을 위한 사용자 피드백 수집이다. 그림 2.13은 프론트 엔드 애플리케이션이 추론 주제에 구독하고 모델의 결과를 표시하는 예를 보여준다. 모든 부정적인 고객 리뷰를 포함하는 애플리케이션이라고 가정하자. 고객 서비스 담당자가 분류해야 하는 리뷰이다. 각 결과 옆에는 "정확함" 또는 "부정확함"이라는 버튼이 있어 고객 서비스 담당자가 선택적으로 클릭하여 피드백을 제공할 수 있다. 이 피드백은 "피드백" 주제에 캡처되어 모델 성능을 평가하고 재훈련을 위한 데이터로 사용될 수 있다.

그림 2.13 이벤트 기반 아키텍처를 사용하여 사용자 피드백 캡처하기

위의 예는 이벤트 기반 아키텍처가 배치 기계 학습 모델을 사용하는 반응형, 저지연 실시간 애플리케이션을 구축하는 것을 어떻게 더 쉽게 만드는지를 보여준다.

이벤트 기반 아키텍처가 실시간 기계 학습 모델을 사용하는 애플리케이션을 구축하는 데 중요한 구성 요소라는 주장도 무리가 아니다. 설계상, 실시간 기계 학습 모델은 먼저 예측을 하고, 레이블이 사용 가능할 때 학습한다. 실시간 맥락에서는 레이블이 언제 사용 가능할지 제어할 수 없다. 기계 학습 모델이 특정 사용자에게 기사 세트를 제시하기로 결정하는 예를 고려해보자. 이 사용 사례의 레이블은 사용자가 이 기사를 관련성이 있다고 판단했는지 여부이며, 관련성을 판단하는 방법은 사용자가 기사를 클릭했는지 여부이다. 사용자 클릭은 이 클릭 이벤트와 기사를 "레이블" 주제에 저장하는 프로세스를 시작할 수 있는 "이벤트"이다. 다른 쪽 끝에는 이 정보를 기계 학습 모델에 전달하는 구독자가 있을 수 있다. 또한 사용자의 페이지에 기사가 있는 시간을 모니터링하고 사용자가 일정 시간 내에 기사에 대해 아무런 조치를 취하지 않으면 "관련 없음" 이벤트를 "레이블" 주제에 게시하는 애플리케이션도 있을 수 있다. 이 모든 것은 아래 그림 2.14에 제시되어 있다.

Real-Time Machine Learning_MEAP_v01_p56

Page 1


--- Page 1 ---
그림 2.14 실시간 머신러닝 애플리케이션 워크플로우

위의 예시에서 보듯이, 전통적인 소프트웨어 아키텍처 패턴은 마이크로서비스 간의 긴밀한 결합을 포함하여 실시간 애플리케이션에 적합하지 않다. 반면에 이벤트 기반 아키텍처는 데이터 수집, 추론 생성, 메트릭 수집, 사용자 피드백과 같은 다양한 작업을 원활하게 조정하여 낮은 지연 시간과 반응성이 뛰어난 머신러닝 애플리케이션을 구축할 수 있게 한다.

이 장에서는 메시지 큐잉 솔루션을 사용하여 이벤트 스트림에서 데이터를 수집하는 방법을 배웠다. 또한 실시간 데이터를 생성하는 퍼블리셔(publisher)와 실시간 데이터를 소비하는 구독자(subscriber)를 만드는 방법도 배웠다. 3장에서는 이벤트 스트림의 데이터를 사용하여 실시간으로 추론을 생성할 뿐만 아니라 데이터가 사용 가능해지자마자 지속적으로 학습하는 나우캐스팅 모델(nowcasting model)을 구축하는 방법에 집중할 것이다.

2.4 요약

실시간 머신러닝은 과거 데이터와 실시간 데이터 모두에 대한 접근이 필요하다.
이벤트는 특정 시간에 무언가가 발생했다는 기록이다.
폴링 간격은 머신러닝을 위해 캡처되는 실시간 데이터의 양에 영향을 미친다.
메시지 큐는 서로 다른 프로세스 간의 간단한 비동기 통신을 가능하게 한다.
퍼블리셔 또는 프로듀서(producer)는 이벤트를 생성하는 프로세스이다.
구독자 또는 소비자(consumer)는 이벤트를 구독하는 프로세스이다.
동기 데이터 전송은 두 당사자 간의 직접적인 조정을 필요로 한다.
비동기 데이터 전송은 게시된 데이터를 저장할 버퍼가 필요하다.
백프레셔(backpressure)는 구독자가 퍼블리셔보다 이벤트를 더 느리게 처리하여 전체 데이터 처리량을 감소시키는 현상이다.
최소 한 번 전달(at least once delivery)은 모든 메시지가 최소 한 번 전달되도록 보장하지만 중복 메시지가 전송될 수 있다.
최대 한 번 전달(at most once delivery)은 중복을 방지하지만 메시지가 누락될 수 있다.
이벤트 스트림은 이벤트의 추가 전용 순서 로그이며 때때로 토픽(topics)이라고도 한다.
이벤트 브로커(event broker)는 데이터를 이벤트 스트림에 지속시켜 많은 퍼블리셔와 구독자 간의 이벤트 전달을 조정한다.

Real-Time Machine Learning_MEAP_v01_p57

Page 1


--- Page 1 ---
구독자는 데이터를 처리하기 전에 이벤트를 미리 가져와 데이터 처리량을 증가시킬 수 있지만, 이는 구독자를 과부하 상태로 만들 수 있다. 이벤트 스트림은 실시간으로 소비되거나 과거 데이터를 조회할 수 있다. 이벤트 기반 아키텍처는 데이터 수집이 추론 및 사용자 피드백 수집과 같은 다른 중요한 머신러닝 프로세스와 동시에 작동할 수 있게 한다.

반응형

'Study > Real-Time Machine Learning' 카테고리의 다른 글

1. Intro: Real-time 기계 학습  (0) 2025.03.06
반응형

이 장에서는 다음을 다룹니다:

 

  • 실시간 데이터란 무엇인가?  
  • 오프라인 학습과 온라인 학습의 차이  
  • 온라인 학습의 일반적인 사용 사례  

실시간 머신러닝(때때로 온라인 학습이라고도 함)은 실시간 데이터를 사용하여 환경의 변화에 적응하는 예측 시스템을 구축하는 접근 방식이다. 이는 역사적인 데이터 세트를 신중하게 선별하여 학습과 평가를 수행하는 배치 방식의 머신러닝(또는 오프라인 학습)과는 다르다. 오프라인 학습의 근본적인 가정은 모델이 운영되는 동안 입력 특성에 지속되는 어떤 참된 값이 있다는 것이다. 실제로 데이터의 통계적 속성, 예를 들어 확률 분포나 특징 간의 관계는 시간이 지남에 따라 변경될 가능성이 크다. 이러한 변화는 데이터 드리프트(data drift)로 알려져 있으며, 머신러닝 모델의 정확성을 감소시킬 수 있는데, 이는 원래 다른 통계적 특성을 가진 데이터로 학습되었기 때문이다. 오프라인 모델은 이러한 정확도 감소를 피하기 위해 정기적으로 재학습되어야 한다. 그러나 이러한 모델을 재학습하는 것은 대규모 데이터 세트를 여러 번 반복 처리해야 하기 때문에 비용이 많이 들고 시간이 많이 소요되기도 한다. 이 모델들이 운영 환경에 배포될 때쯤에는 이미 더 이상 사실이 아닌 데이터 가정에 기반하여 작동할 수 있다. 즉, 오프라인 모델은 실제 환경에서 발생하는 데이터 변화를 적응할 수 없다.

 

실시간 기계 학습 모델은 환경에서 데이터를 지속적으로 수신하여 즉석에서 배우도록 설계되어 있습니다. 대용량 데이터셋에 대해 훈련되는 대신, 실시간 기계 학습 모델은 새로운 데이터가 제공됨에 따라 점진적으로 학습됩니다. 온라인 학습의 개념은 새로운 것이 아니지만, 데이터 중심의 현대 조직에게 점점 더 중요한 의미를 가집니다. 세계가 점점 더 실시간으로 변하면서, 사기 탐지 및 사용자 추천과 같은 애플리케이션은 빠르게 변화하는 데이터 트렌드에 대응하기 위해 더 많은 실시간 학습 기술을 통합할 필요가 있습니다.

이 책의 목표는 실용적인 실시간 기계 학습 시스템을 구축하기 위한 논리와 도구를 제공하는 것입니다. 우리는 Python을 사용하여 처음부터 몇 가지 개념 증명 온라인 학습 시스템을 구축함으로써 이를 달성할 것입니다. 이 책의 모든 예시는 단일 기기에서 실행할 수 있습니다.

이 장에서는 기계 학습과 관련하여 "실시간" 데이터가 무엇을 의미하는지 탐구하고, 오프라인 학습과 온라인 학습을 비교하며, 온라인 학습의 가장 일반적인 사용 사례를 소개할 것입니다.


1.1 실시간 데이터란 무엇인가?


일반적인 하루 동안 소비하는 모든 데이터를 잠시 생각해 보세요. 일반적인 하루는 소셜 미디어에서 게시물을 스크롤하고, 이메일을 확인하며, 통근 중에 뉴스를 듣고, 쇼핑하면서 대화의 일부를 듣거나, 스트리밍 서비스를 통해 TV 프로그램을 시청하는 것일 수 있습니다. 이 중에서 어떤 데이터가 역사적이고 어떤 데이터가 진정한 실시간일까요? 차이를 정의하는 한 가지 방법은 지연에 대한 인식입니다. 받은 이메일은 수신자가 읽을 기회가 있을 때까지 "받은 편지함"에 저장됩니다. 그래서 대부분의 경우 이들은 역사적인 데이터의 예입니다. 반면, 화상 통화는 낮은 대기 시간의 상호작용이 필요하므로 일반적으로 실시간으로 간주합니다.

실제로, 화상 회의 및 기타 네트워크 지원 애플리케이션은 수 밀리초 단위의 데이터 전송 및 처리 지연을 흔히 포함합니다. 지구의 반대편 간 광섬유 전송의 최소 지연 한계는 L = 거리 / 속도 = 20,000 km / 205,000 km/s = 약 98 ms입니다. 미니애폴리스, MN에서 시드니, 호주에 있는 친구에게 메시지를 보내려면 14,480 km / 205,000 km/s = 약 72 ms가 걸리며, 다시 메시지를 받는 데 또 다른 72 ms가 걸립니다. 실질적으로는 지역 서버 또는 콘텐츠 전송 네트워크(CDN)를 사용하여 이를 완화합니다. 이러한 전략은 일관성을 대가로 데이터 복제 양을 일반적으로 증가시킵니다. 실시간 애플리케이션을 설계할 때 엔지니어는 목표 사용 사례에 얼마나 많은 지연이 허용되는지를 결정해야 합니다.

모든 실시간 애플리케이션에는 0이 아닌 지연이 존재하기 때문에, 생산 환경에서의 진정한 실시간 데이터는 달성 불가능한 이상입니다. 이 책에서는 실시간 데이터를 움직이는 데이터 (data in motion)로 정의합니다. 이는 데이터가 여전히 수신자에게 유용한 한 몇 초 또는 몇 분이 지난 데이터도 실시간으로 간주될 수 있음을 의미합니다. 사용 사례에 따라 수신자는 여러 엔티티 중 하나일 수 있습니다.

 

  • 데이터를 변환하는 스트림 프로세서
  • 데이터 인스턴스를 받아 예측을 생성하는 추론 엔진
  • 사람들을 위해 데이터를 렌더링하는 라이브 대시보드

이 정의에 대한 중요한 부가 요소는 모든 데이터가 실시간 데이터로 시작된다는 것이다. 파일 시스템, 데이터 레이크, 또는 데이터베이스와 같은 어떤 저장 형태에 저장되어 이후 검색될 때에는 실시간 데이터가 아니게 된다. 본서에서는 이를 데이터 저장 상태 또는 역사적 데이터라고 한다. 실시간 데이터와 역사적 데이터의 주요 차이점은 실시간 데이터가 적시성과 관련된 응용 가치가 있다는 것이다.

다음 그림에 나타나 있는 실시간 응용 프로그램을 고려해 보자. 이메일 서버는 이메일을 수신하고 이를 온라인 스팸 분류 모델을 훈련하는 데 사용한다. 온라인 모델은 이메일이 스팸인지 아닌지를 실시간으로 결정한다. 스팸 이메일은 즉시 삭제되며, 비스팸 이메일은 데이터베이스에 저장된다. 이메일 클라이언트는 수신 이메일을 알림으로 처리하고, 과거 이메일을 데이터베이스에서 검색할 수 있는 능력을 가진다. 역사적 데이터와 실시간 데이터 스트림 모두 이 응용 프로그램에 존재한다. 수신 이메일은 실시간 데이터로 처리되는 동안 데이터베이스에 저장될 때까지 실시간 데이터로 간주된다. 이메일이 데이터베이스에서 로드되면 역사적 데이터로 간주된다.

그림 1.1 실시간 및 역사적 데이터 스트림을 가진 이메일 알림 시스템

요컨대, 역사적 데이터는 모든 데이터가 한 번에 알려지는 오프라인 학습 방법에 사용된다. 실시간 데이터는 새로운 데이터가 사용 가능할 때마다 점진적으로 학습되는 온라인 학습 방법에 사용된다. 이러한 용어는 학습이 어디서 이루어지는지도 설명한다. 오프라인 학습은 개발 또는 스테이징 환경에서 이루어진다. 온라인 학습은 실제 환경에서 직접 이루어진다. 다음 섹션에서는 이 두 가지 접근 방식을 논의할 것이다.

1.2 오프라인 학습

전통적인 기계 학습은 대량의 데이터를 수집하고 모델을 데이터에 맞추기 위한 여러 훈련 단계를 수행하는 것을 포함한다. 이 접근 방식은 오프라인 학습이라고 불리며, 모델 훈련은 생산 환경과 완전히 독립된 환경에서 이루어지며, 역사적 데이터를 사용하여 미래를 모델링한다. 오프라인 기계 학습 주기는 그림 1.2에 나타나 있다.

그림 1.2 모델의 오프라인 학습과 운영 환경에서의 예측

 

오프라인 학습은 데이터 수집 단계로 시작됩니다. 데이터는 하나 이상의 데이터 소스에서 로드되어 지속적인 저장을 위한 형식으로 직렬화됩니다. 오프라인 학습을 위한 데이터는 종종 비구조화된 형태로, 사전에 정의된 스키마가 없습니다. 텍스트는 인터넷과 고유 문서에서 널리 접근 가능하며 PDF 및 HTML과 같은 파일 형식에서 쉽게 추출할 수 있기 때문에 흔한 비구조화 데이터입니다. 머신 러닝을 위해 데이터를 보다 정규화된, 수치 형식으로 변환하기 위한 전처리 단계가 일반적으로 적용됩니다. 예를 들어, 텍스트를 벡터 임베딩(embedding)으로 변환하거나 문자열에서 수치 또는 범주형 값을 구문 분석합니다.

데이터 수집 후에는 출력 레이블과 상관 있는 특징(feature) 집합을 발견하는 것이 목표인 특징 공학 (Featuring Engineering) 단계가 이어집니다. 특징 공학은 반복적인 과정으로, 과학이라기보다는 예술에 가깝습니다. 이는 예측력을 지닌 특징을 식별하기 위해 통계적 및 시각적 분석, 그리고 문제에 대한 직관을 포함합니다.

훈련 단계에서는 데이터셋을 훈련, 테스트, 검증 세트로 구분합니다. 이 세트들은 머신 러닝 모델이 훈련 및 추론에 사용할 수 있도록 동일한 데이터 스키마를 가집니다. 모델에 필요한 데이터 포인트의 수는 모델의 복잡성에 따라 달라집니다. 예를 들어, 선형 모델은 특징과 레이블 간의 선형 관계를 찾으려 하기 때문에 검색 공간은 특징의 수에 직접적으로 의존합니다. 인공 신경망과 같은 더욱 복잡한 모델은 훨씬 더 많은 수의 학습 가능한 파라미터를 가질 수 있습니다. 특징의 수를 증가시키면 모델의 예측 가능성이 향상될 수 있지만, 이는 입력 공간을 충분히 설명하기 위해 요구되는 데이터 포인트의 수도 증가시킵니다. 이를 차원의 저주(curse of dimensionality)라고 부릅니다.

 

기계 학습은 매개변수 p를 최적화하여 훈련 세트에 대한 손실을 최소화하는 것을 목표로 하는 최적화 문제로 정의됩니다. 예측된 출력은 어떤 매개변수 집합에서도 계산할 수 있습니다. 이는 p를 기준으로 총 훈련 손실을 나타내는 손실 함수 L이 정의될 수 있음을 의미합니다. 이는 문제의 목표를 설정하기 때문에 목표 함수라고도 합니다: 손실을 최소화하는 것입니다. 예를 들어, 제곱 오차 손실은 각 개별 예측 오류의 제곱을 더하여 수량화됩니다.



모델의 복잡성과 입력 특징의 수에 따라, 모델은 손실을 최소화하기 위해 조정 가능한 여러 개 또는 수백 개의 매개변수를 가질 수 있습니다. 여러 매개변수를 가진 모델은 매개변수 공간을 효율적으로 탐색할 수 있어야 합니다. 일반적인 방법론은 경사 하강법(gradient descent)으로, 훈련 가능한 매개변수에 대해 손실 벡터(gradient)를 계산하고 벡터의 반대 방향으로 매개변수를 조정하려고 시도합니다. 각 훈련 단계에서 매개변수는 다음 공식을 사용하여 업데이트됩니다.

 

최적화 방법은 모델이 최적의 해에 도달하는 속도나 해를 찾는지에 영향을 미칠 수 있기 때문에 중요한 선택입니다. 현대의 기계 학습 라이브러리 예를 들어, scikit-learn은 주어진 훈련 세트에 대해 다양한 종류의 모델을 훈련하기 위한 유용한 추상화를 제공합니다. 다음은 scikit-learn을 사용하여 고전적인 아이리스 샘플 데이터 세트에 대해 LogisticRegression 모델을 훈련하고, 수치 입력 특징에 기초하여 클래스 라벨을 예측하는 예제를 제공합니다.

 

Listing 1.1 Supervised Offline Learning in Python

 from sklearn import datasets
 from sklearn.metrics import accuracy_score
 from sklearn.linear_model import LogisticRegression
 from sklearn.model_selection import train_test_split
 iris = datasets.load_iris()                                       #A
 X = iris.data                                                     #A
 y = iris.target                                                   #A
 X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, 
random_state=42) #A
 model = LogisticRegression(random_state=42).fit(X_train, y_train)    #B
 y_pred = model.predict(X_test)                                    #C
 print(“Predicted values: “, y_pred)                               #C
 print(“True values: “, y_test)                                    #C
 print(“Accuracy: “, accuracy_score(y_test, y_pred)                #C

#A 샘플 iris 데이터셋 로드
#B 훈련 집합에 로지스틱 회귀 모델 훈련
#C 테스트 집합에 대한 모델 평가

 

다음은 이 코드 실행의 결과입니다.  

Predicted values:  [1 0 2 1 1 0 1 2 1 1 2 0 0 0 0 1 2 1 1 2 0 2 0 2 2 2 2 2 0 0]
True values:  [1 0 2 1 1 0 1 2 1 1 2 0 0 0 0 1 2 1 1 2 0 2 0 2 2 2 2 2 0 0]
Accuracy:  1.0


평가 단계는 모델을 스테이징 환경에 로드하고 광범위하게 테스트하는 것을 포함합니다. 이 단계는 머신 러닝 애플리케이션 개발에서 흔한 실패 지점입니다. 모델은 샘플 외 데이터에서 성능이 낮거나 예상치 못하거나 이해 관계자에게 불충분한 결과를 생성할 수 있습니다. 전자는 대개 데이터 문제입니다. 훈련 집합의 높은 분산으로 인해 과적합되거나 낮은 분산으로 인해 부족하게 적합되어 실제 데이터 포인트에 일반화되지 않을 수 있습니다. 후자의 문제는 엔지니어링 팀과 이해 관계자 간의 투명성과 의사소통 부족에서 기인할 수 있습니다.

배포 단계는 모델을 프로덕션으로 올려 최종 사용자가 접근할 수 있게 하는 기술적 과업입니다. 성공적인 배포는 주요 성과 지표나 사용자로부터의 인간 피드백을 수집하는 메커니즘을 포함합니다. 배포는 오프라인 학습 사이클의 끝을 표시합니다. 오프라인 학습 사이클은 반복적인 프로세스라는 점에서 소프트웨어 개발 수명 주기(Software Development Life Cycle ,SDLC) 와 유사합니다. 비즈니스 목표를 따라잡기 위해 모델은 주기적으로 재학습되어야 합니다.

 

오프라인 학습 워크플로는 완전히 동기식으로, 워크플로의 각 단계는 이전 단계가 완료되어야 진행할 수 있다. 예를 들어, 모형 훈련은 모든 훈련 데이터가 수집되기 전까지는 진행될 수 없다. 이는 오프라인 모형이 일반적으로 모든 데이터 포인트에 한 번에 접근할 필요가 있기 때문이다. 이러한 구조는 새로운 모형의 배포 속도를 제한한다. 배포 속도는 또한 모형 훈련 시간에 의해 제한된다. 각 훈련 단계에서 모형은 처음부터 다시 훈련된다. 훈련 시간은 모형의 복잡성과 훈련 데이터 크기에 영향을 받는다. 오프라인 모형의 정확도를 개선하는 방법 중 하나는 훈련 세트 크기를 늘리는 것이므로, 훈련 시간은 일반적으로 각 사이클에서 선형적으로 증가한다.

오프라인 학습은 과거 데이터를 이용해 현재 문제를 해결하려는 조직에게 자연스러운 출발점이지만, 이러한 시스템을 유지하기 위해 지속적인 노력이 필요하다. 기본 데이터 경향이 더 빠르게 변화하는 경우, 훈련된 모형이 관련성을 유지하고 유용하게 만들기 위해 반응형 학습 접근이 필요하다. 다음 섹션에서는 이러한 문제를 해결하기 위해 온라인 학습의 개념을 소개한다.

1.3 온라인 학습  

온라인 학습은 데이터의 급변하는 상태에 대응하는 실시간 학습 접근이다. 오프라인 학습에서처럼 대규모 데이터 집합을 학습하는 대신, 온라인 학습은 미니 배치나 단일 인스턴스의 데이터를 사용하여 점진적으로 학습한다. 앞서 설명한 바와 같이, 오프라인 학습 워크플로는 동기적인 반면, 온라인 학습 워크플로는 비동기적이다. 데이터 수집부터 특징 엔지니어링, 훈련, 예측 생성까지 각 구성 요소가 서로 독립적으로 작동한다. 온라인 학습은 사람의 인터넷 기반 학습과 혼동을 피하기 위해 점진적 학습이나 실시간 학습이라고도 불린다.


1.3.1 모형 드리프트 문제  

초기의 온라인 학습 시스템은 필요에 의해 만들어졌지만, 현대의 온라인 학습 접근은 모형 드리프트라는 특정 데이터 문제를 해결하기 위해 설계되었다. 일반적으로 모형 드리프트는 모형이 구식이 되는 결과로 발생한다. 드리프트는 데이터를 수집하고 모형을 더 빠르게 재훈련하여 완화할 수 있다. 그러나 배치 모형을 재훈련하는 것은 데이터셋이 커짐에 따라 비용이 많이 들 수 있다. 온라인 학습은 가능한 한 재훈련 시간을 줄임으로써 드리프트 문제를 해결한다. 대규모 데이터 집합을 오프라인에서 수집하는 대신, 새로 수신한 데이터 포인트를 즉시 사용하여 모형을 업데이트한다. 이는 데이터 분포의 변화에 더 빠르게 적응하고 외부 변화에 반응할 수 있는 모형을 생성한다.

 

모델 드리프트는 시간에 따라 머신러닝 모델의 예측력이 저하되는 현상을 말한다. 이는 보통 모델이 더 이상 유효하지 않은 데이터 가정을 바탕으로 학습되었기 때문이다. 머신러닝 모델을 학습시킬 때, 인구 집단 X에서 출력 레이블 Y를 잘 예측할 수 있을 것으로 보이는 특징 집합을 선택한다. 예를 들어, 붓꽃 분류 문제에서 특징 집합 X는 꽃잎의 길이, 꽃잎의 너비, 꽃받침의 길이, 꽃받침의 너비이고, 출력 레이블 Y는 붓꽃의 종(setosa, virginica, versicolor)이다. 가능한 모든 입력과 출력을 알 수는 없지만, 이들 간의 관계를 확률로 설명할 수 있다. P(X)가 입력 특징의 확률 분포를 나타내고 P(Y)가 출력 레이블의 분포를 나타낸다면, 특징과 레이블이 함께 나타날 확률을 결합 확률 P(X,Y) = P(X)P(Y|X)로 표현할 수 있다. 이는 모델 드리프트에는 두 가지 주요 구성 요소가 있음을 의미한다: 입력 특징의 분포 P(X)와 특징이 주어졌을 때의 레이블 확률 P(Y|X)이다.

데이터 드리프트 또는 특징 드리프트는 입력 특징의 분포 P(X)가 시간에 따라 변화하는 경우를 설명한다. 예를 들어, 이메일 길이에 따라 스팸 여부를 분류하는 스팸 분류기를 생각해보자. 초기 학습 세트에서는 이메일 길이가 40-60자 사이로 다양하다. 그러나 모델이 실제 환경에 배포된 후에는 훨씬 더 긴 이메일을 관찰하기 시작한다. 모델이 100자 이상의 이메일에 대해서는 충분히 학습되지 않았기 때문에 이러한 긴 이메일을 정확히 분류하는 데 어려움을 겪을 수 있다. 이 시나리오는 특징 분포의 변화로 모델의 성능이 시간에 따라 저하되는 것을 설명한다. 특징 변화는 현실을 제대로 대표할 이메일을 학습 세트에 포함하지 못한 결과일 수도 있다. 그러나 외부 요인, 예를 들어 마케팅 전략의 변화에 의해서도 발생할 수 있다. 아래 그림은 이러한 특징 드리프트를 보여준다.

그림 1.3 모델이 실제 환경에 배포된 후 이메일의 평균 길이가 크게 증가하는데, 이는 특징 드리프트의 징후이다. 이는 모델이 훨씬 더 짧은 이메일을 기반으로 학습되었기 때문에 덜 정확한 예측을 초래할 수 있다.

 

개념 드리프트(concept drift)는 시간이 지남에 따라 입력 특징과 출력 레이블 간의 관계가 변하는 것을 말합니다. 수학적으로 이는 P(Y|X)가 시간이 지나면서 어떻게 변화하는지를 나타냅니다. 배치 모델(batch models)은 P(Y|X)가 고정되어 있을 때 잘 작동합니다. 그러나 현실 세계에서 예측하려는 출력은 외부 요인에 의해 영향을 받습니다. 이는 자연재해나 정전과 같은 일시적인 사건 때문일 수도 있고, 소비자 행동 및 시장 변화와 같은 장기적인 추세에 의해 발생할 수도 있습니다. 예로는 2020년 코로나바이러스 대유행이 있으며, 이로 인해 손 소독제나 면역 비타민과 같은 특정 제품에 대한 소비자 수요가 증가하였습니다. 이는 데이터 특징과 소비자 구매 간의 관계가 갑자기 변화하여 매출 예측 모델의 정확성에 영향을 미쳤을 가능성이 큽니다. 2020년 이전에 훈련된 모델은 손 소독제 판매가 학기 시작과 같은 계절적 경향 외에는 비교적 일정하다고 가정할 수도 있습니다. 이러한 모델은 학습된 관계가 더 이상 유효하지 않으므로 재훈련이 필요합니다.

특징 드리프트(feature drift)와 개념 드리프트(concept drift)의 차이는 아래 그림에서 설명됩니다. 머신 러닝 모델은 초기 데이터 점 집합에 맞추어져 두 출력 클래스를 구별하는 결정 경계를 해결합니다. 특징 드리프트는 특징 공간에서 데이터 포인트를 변화시켜 모델의 정확성에 영향을 미칠 수 있습니다. 개념 드리프트는 결정 경계 자체를 변화시키며, 이는 모델을 무효화할 가능성이 더욱 큽니다. 이는 반드시 특징이 드리프트했다는 것을 의미하지는 않습니다. 예를 들어, 날씨를 사용하여 소매 판매를 예측하는 모델은 날씨가 좋음에도 불구하고 소비자가 집에 머무는 전 세계적 봉쇄로 인해 영향을 받을 것입니다.

그림 1.4 특징 드리프트는 특징의 분포를 변화시키는 반면, 개념 드리프트는 문제의 결정 경계를 변화시킵니다.

1.3.2 온라인 학습 사이클

온라인 학습은 실시간 스트림에서 새로운 데이터 인스턴스를 수신함으로써 시작된다. 데이터 인스턴스는 역사적 데이터셋의 단일 샘플에 해당하며, 현실 세계에서 관찰된 특징들의 집합을 포함한다. 모델은 데이터의 특성에 대한 지식이 거의 없는 상태에서 초기화된다. 이 모델을 사용하여 예측을 계산하고 이를 애플리케이션의 다운스트림 부분으로 전달한다. 시스템에 레이블이 도착하면, 이를 모델에 제공하여 모델이 레이블에서 학습하고 매개변수를 업데이트한다. 이 시점에서는 출력의 두 가지 버전이 있다. 하나는 온라인 모델이 계산한 예측 값이고, 다른 하나는 현실 세계에서 관찰된 참값이다. 모델을 교육하는 아이디어는 예측 값과 관찰 값 간의 손실을 통해 학습하는 것이다.

그림 1.5는 온라인 학습 워크플로우의 예를 보여준다. 이는 작업의 순서를 반영하는데, 즉 샘플에 대해 먼저 예측을 하고, 레이블이 사용 가능해진 후 동일한 샘플에 대해 모델을 학습시킨다. 그러나 오프라인 머신러닝 워크플로우와 달리, 이러한 작업은 실시간 데이터 처리의 특성상 비동기적으로 진행된다. 우리는 언제 레이블을 받을지 미리 알 수 없으므로, 이러한 작업은 실시간 데이터 처리에 더 적합한 스트림 처리 엔진을 활용하게 된다. 실시간 데이터 처리에 대한 내용은 2장에서 탐색할 것이다.

그림 1.5 Training a model “online” while using it for predictions


이전 섹션에서는 오프라인 모델이 목표 함수(objective function)를 최소화하는 매개변수 집합을 찾아 학습되는 방법을 설명했다. 오프라인 학습과 달리, 전체 데이터셋은 학습 시점에 사용 가능하지 않기 때문에 총 손실을 계산할 수 없다. 그러나 손실 함수가 합성 가능하다면, 예를 들어 제곱 오차의 합이라면 개별 항목으로 분해될 수 있다. 이는 아래에 조정된 공식으로 개별 데이터 샘플에 대한 그라디언트 보정을 계산할 수 있음을 의미한다. 차이점은 손실 함수가 전체 데이터셋이 아닌 새로운 샘플에 대해서만 계산된다는 것이다.

 

이것은 증가 경사 하강법 (incremental gradient descent) 접근 방식의 기초입니다. 계수 kk는 매개변수가 얼마나 영향을 받을지를 조절하며 학습률 (learning rate)로 생각됩니다. 높은 학습률은 빠르게 변화하는 환경에서 바람직한데, 이는 모델이 데이터의 변화에 더 빠르게 적응할 수 있기 때문입니다. 낮은 학습률은 모델의 배치 버전 (batch version)을 근사하지만, 단기 예측에는 덜 유용한데, 이는 모델이 더 천천히 적응하기 때문입니다.

온라인 학습이 오프라인 학습의 기능적 근사치인 점을 고려할 때, 온라인 모델은 데이터 세트를 미리 알고 있는 오프라인 모델보다 일반적으로 덜 정확한 예측을 합니다. 온라인 모델의 주요 장점은 단기적인 트렌드에 반응하고 근시일 내 예측을 생성할 수 있다는 점입니다. 재고 예측이나 로드 밸런싱과 같은 시간 민감형 사용 사례에는 실시간으로 패턴을 학습할 수 있는 것이 유리할 수 있습니다. 반면, 배치 모델은 데이터의 분포가 시간이 지나도 변하지 않을 가능성이 높은 상황에서 선호됩니다.

1.3.3 오프라인 vs. 온라인 학습

1.3.2에서 논의했듯이, 온라인 학습은 데이터에 대한 기본적인 가정이 시간이 지남에 따라 변할 가능성이 있을 때 가장 효과적입니다. 이를 설명하기 위해 매일 웹사이트 방문자를 예측하는 시스템을 개발한다고 상상해 봅시다. 일일 방문자 수는 N = 𝜇 + v로 나타낼 수 있으며, 𝜇는 평균 일일 방문자를, v는 일일 변동량을 나타냅니다. 𝜇는 외부 요인에 따라 달라지기 때문에 계절적 추세, 바이럴 인터넷 이벤트 및 일반적인 인기 증가로 인해 변화할 가능성이 큽니다. 이러한 데이터 분포는 시간이 지남에 따라 평균, 분산 및 공분산과 같은 통계적 특성이 변하기 때문에 비정상적 분포(non-tationary distribution)라고 합니다. 날씨와 주식 시장과 같은 실제 비정상적 분포의 예는 계절적 및 장기적 추세에 따라 달라질 수 있기 때문입니다. 아래 그림은 정상적 분포와 비정상적 분포의 차이를 보여줍니다.

그림 1.6 정상적 방문자 분포(왼쪽)는 주기적 변동을 보여주지만 평균 방문자는 변하지 않습니다. 비정상적 분포(오른쪽)는 시간이 지나면서 방문자 수가 점진적으로 증가하는 것을 보여줍니다.


역사적 시계열 데이터의 일일 방문자 수에 대해 훈련된 머신 러닝 모델은 시간이 지남에 따라 증가하는 방문자 수의 영향을 받을 수 있다. 비정상성(non-stationarity)은 데이터 변환이나 직접 모델링을 통해 수정할 수 있다. 예를 들어 평균 방문자 수의 증가가 선형이라고 가정하고 오프라인 모델링 전에 이를 차감할 수 있다. 온라인 모델은 점진적으로 학습하여 비정상성을 고려한다. 오프라인 모델은 시간이 지남에 따라 데이터를 정적으로 유지하는 작업에 더 적합하다. 예를 들어, 개와 고양이를 구별하기 위한 이미지 분류기는 이미지 클래스가 시간에 따라 변하지 않으므로 오프라인에서 더 쉽게 훈련될 수 있다.

온라인 학습은 통계학에서의 이동 평균(rolling mean)과 유사하다. 전통적인 평균은 집합 내 모든 샘플의 평균이지만, 이동 평균은 최근 n개의 샘플에 대한 평균이다. 이동 평균은 데이터를 더 최근의 상태로 보기 위해 오래된 정보를 의도적으로 "잊는다." 온라인 모델은 새로운 데이터 샘플에 맞춰 모델 매개변수를 조정하면서 이와 유사한 기능을 수행한다. 잊는 효과는 데이터에 과적합(overfitting)을 방지하기 위한 일종의 규제 역할을 하기 때문에 때로는 바람직할 수 있다. 예를 들어, 뉴스 기반 모델은 선거 기간과 같은 일시적 시기에 학습한 특정 지식을 잊는 것이 유리할 수 있다. 딥 뉴럴 네트워크(deep neural networks)와 같은 일부 모델은 이전에 인코딩된 중요한 지식, 예를 들면 문법 규칙을 잃어버리는 "파멸적 망각(catastrophic forgetting)"에 취약할 수 있다. 이것은 각 학습 실행 중 업데이트되는 가중치 수를 줄여 완화할 수 있다.

온라인 학습 접근법이 문제 해결에 적합한지 평가할 때는 배포 시간, 관측 가능성, 유지 보수 비용과 같은 운영적 요소를 고려하는 것이 중요하다. 오프라인 학습 주기의 각 단계는 여러 분야에 걸쳐 많은 엔지니어링 시간과 지식을 필요로 한다. 온라인 모델은 개발 워크플로우가 생산 워크플로우와 동일하기 때문에 생산으로 옮기기가 훨씬 쉽다. 이러한 모델은 학습 과정의 일환으로 예측을 하므로 스테이징(staging) 또는 제품 환경과 호환된다. 반면 배치 모델은 생산에서 예측을 생성하기 위한 별도의 파이프라인을 필요로 한다.

온라인 모델의 독특한 점은 생산 중에도 학습한다는 것이다. 이는 거의 자율적으로 동작하여 수동으로 재훈련하고 오프라인으로 평가할 필요성을 제거한다. 이는 오프라인 모델에 비해 몇 가지 장점을 제공한다.

  • 데이터를 훈련용과 테스트용 세트로 나눌 필요가 없다. 동일한 데이터가 먼저 예측(테스트 데이터)을 하고 그 후 레이블이 제공되면 모델을 훈련(훈련 데이터)하는 데 사용된다.
  • 데이터 누출(테스트 데이터셋을 실수로 모델 학습에 사용하는 것)을 걱정할 필요가 없다.
  • 모델을 주기적으로 처음부터 재훈련할 필요가 없다. 모델은 지속적으로 학습하기 때문이다.
  • 실시간 모델은 한 번에 하나의 예제만 학습해야 하기 때문에 메모리 사용량이 감소한다.

온라인 모델의 한 가지 단점은 정확한 결과를 생성하고 있는지 확인하기 위해 지속적인 모니터링이 필요하다는 점입니다. 오프라인 학습과는 달리, 온라인 모델은 광범위하게 훈련된 오프라인 모델과 비슷한 품질의 결과를 보장하지 않습니다. 전통적으로 머신러닝은 데이터를 배치(batch)로 사용하여 모델을 훈련시키는 실험 중심의 반복적인 과정이 포함됩니다. 모델을 실제 환경에 배포하기 전에 여러 테스트를 수행하는 것이 더 쉽고, 모델에 데이터를 입력하기 전에 데이터를 정리하고 표준화하는 데 많은 작업이 수행됩니다. 온라인 학습에서는 데이터의 특성을 이해하고 온라인 모델을 훈련하는 데 사용할 특징을 결정하기 위해 일부 탐색적 데이터 분석을 수행할 수 있고 수행해야 합니다. 그러나 시스템에 실시간으로 데이터가 도착하는 특성 때문에 데이터 품질을 보장하는 데에 거의 통제력이 없습니다. 따라서 시스템에 보호 장치를 설치하고 문제가 발생할 경우의 완화 전략을 수립하는 데 훨씬 더 많은 시간이 필요합니다. 모델의 적응성과 정확성 사이에는 절충이 있습니다. 고위험 사용 사례에서는 모델을 오프라인으로 구축하여 실제 환경에 배포하기 전에 올바르게 검증하고 테스트하는 것이 바람직합니다.

온라인 학습은 빈번한 업데이트가 포함되므로 사람의 피드백 메커니즘에 자연스럽게 적합합니다. 사람의 피드백을 기반으로 작동하는 배치 모델은 사용자가 개선 사항을 보려면 특정 재훈련이 필요합니다. 사용자 선호도가 배치 모델이 재훈련되는 것보다 더 자주 변경될 수 있기 때문에 온라인 학습은 추천 시스템과 더 잘 호환됩니다. 다음 섹션에서 이를 더 자세히 논의할 것입니다.

표 1.1은 온라인 학습의 장단점을 제공합니다. 이는 온라인 학습을 사용할 새로운 프로젝트를 시작할 때 개발자가 고려해야 할 사항들입니다.

표 1.1 온라인 학습의 장단점  

장점   단점  
데이터에 대한 가정이 시간이 지남에 따라 변경될 가능성이 있는 사용 사례에 적합   중요한 역사적 지식을 잊어버리는 현상인 치명적 망각에 취약  
개발과 실제 환경 워크플로 간의 유사성 때문에 배포하기 쉬움  모델이 정확한 결과를 생성하고 있는지 확인하기 위해 지속적인 모니터링 필요  
온라인 모델은 실제 환경에서 학습하므로, 재훈련 워크플로가 필요하지 않음   온라인 모델을 훈련하는 데 사용하기 전에 데이터를 검사할 수 없으므로 더 많은 보호 장치와 완화 전략이 필요함  


온라인 모델의 광범위한 채택을 방해하는 한 가지 장애물은 스트림 지향 아키텍처가 필요하다는 점입니다. 전통적으로 원시 데이터를 소싱하고 데이터를 표준화하고 정규화하여 검색 가능한 상태로 변환한 후 중앙 저장소에 로드하는 과정을 설명하는 ETL(추출-변환-로드)이 사용됩니다. 이러한 데이터 파이프라인은 오프라인 학습을 더 일관되고 반복 가능하게 만듭니다. 온라인 학습은 STL(스트림-변환-로드)이라고 불리는 유사한 데이터 방법론을 필요로 합니다. 제2장에서 그 구현 세부 사항과 독특한 과제를 논의할 것입니다.

 

1.4 실시간 머신 러닝의 사용 사례  

기술 발전, 인터넷, 스마트폰, 소셜 미디어 등의 이유로 우리는 이제 온라인에서 훨씬 더 많은 시간을 보내는 세상에 살고 있다. 예를 들어, 대부분의 소매 쇼핑은 여전히 오프라인으로 이루어지지만, 온라인 쇼핑의 비율은 해마다 증가하고 있다. 미국 인구조사국에 따르면, 전체 소매 판매 중 온라인 판매의 비율은 1999년 4분기의 0.6%에서 2024년 1분기의 15.9%로 꾸준히 증가했다고 한다 (https://www. census.gov/ retail/ecommerce. html).

더 적은 사람들이 물리적인 신문을 읽고 대신에 CNN, Fox News와 같은 전통적인 미디어 서비스나 소셜 미디어를 통해 온라인에서 뉴스나 영상을 보고 있다. 또한 우리는 Netflix, YouTube, TikTok과 같은 서비스에서 엔터테인먼트를 소비하고 있다.

소비 패턴의 변화로 인해, 온라인 활동을 통해 더 많은 데이터가 생성되고 있으며, 기업들은 우리에게 상품과 서비스를 판매하기 위해 이 데이터를 이해하려고 노력하고 있다.

이미 설명한 바와 같이 Apache Spark와 같은 분산 시스템에서 실행할 수 있는 알고리즘 등 데이터 폭발에 대응하기 위한 머신 러닝 기술적 진보가 이루어졌다. 그러나 데이터의 양은 계속 증가하고 있고 인터넷의 정보 흐름 속도로 인해 데이터 특성의 빠른 변화와 같은 추가적인 복잡성이 있다. 다음은 전통적인 머신 러닝을 사용한 솔루션이 실패하는 몇 가지 상황이다:

  • 머신 러닝 모델을 훈련하는 데 사용된 데이터의 통계적 분포가 실제 환경에서 만나는 데이터의 분포와 크게 다를 때.
  • 웹사이트의 새로운 또는 드문 방문자와 같이 머신 러닝 모델을 훈련할 충분한 데이터가 없거나 전혀 없는 경우.
  • 데이터의 특성이 자주 변하고 모델이 배포되자마자 오래되기 쉬운 경우, 예를 들어 사용자 비디오 시청 선호도가 변하는 경우.

위의 사례가 보여주듯이, 전통적인 머신 러닝의 가장 큰 문제는 모델이 훈련되었을 때 매개변수와 가중치가 "고정"된다는 것이다. 모델 드리프트가 시간이 지나면서 모델의 정확도를 잃게 하는 방법에 대해 이미 논의하였다. 모델을 재훈련시키는 것은 전체 새로운 데이터 집합에서 처음부터 훈련해야 하기 때문에 비용이 많이 들고, 느리고, 오류가 발생하기 쉽다.

온라인 모델링 접근으로 전환하는 것은 이러한 문제를 완화시킬 수 있다. 배치 방식과 달리 실시간 머신 러닝 모델은 새로운 데이터에 계속해서 학습할 수 있다. 즉, 매 새로운 데이터 인스턴스마다 가중치와 매개변수가 업데이트된다. 결과적으로 이러한 모델은 더 빠르게 적응할 수 있으며, 한 번에 하나의 레코드에서 학습하기 때문에 메모리 사용량이 적어진다. 이제 실시간 머신 러닝에 더 적합한 몇 가지 사용 사례를 살펴보겠다.

 

1.4.1 추천 시스템

추천 시스템은 오래전부터 존재하고 있었습니다. 아마존은 20여 년 전에 온라인 상점에서 고객이 구매를 돕기 위해 추천 시스템을 사용하여 이 개념을 대중화했습니다.

추천 시스템을 구축하는 방법은 여러 가지가 있습니다. 사용자 프로필의 유사성을 사용하여 추천을 하는 협업 필터( Collaborative Filter)를 만들거나, 시계열 모델(Time Series Model)을 사용하여 사용자의 행동을 시간에 따라 모델링할 수 있습니다.

하지만 이렇게 생각해 보십시오. 유튜브에서 동영상을 감상하는 사용자가 있다고 가정합니다. 일주일 동안 귀하는 친구들이 정기적으로 귀여운 동영상을 공유했기 때문에 주로 고양이 동영상을 보는 데 관심이 있습니다. 그런데 첫 번째 직업으로 머신 러닝 엔지니어가 되어 이 분야에 대해 가능한 한 많이 배우고 싶습니다. 협업 필터나 시계열 모델을 사용해 구축된 모델은 귀하가 머신 러닝 동영상을 보기로 선호를 변경했음에도 계속해서 고양이 동영상을 추천할 것입니다. 이는 이러한 모델들이 귀하의 선호 변화에 대한 지식이 없기 때문입니다.

마찬가지로, 유튜브에 처음 가입한 사용자라면 어떤 동영상을 추천해야 할까요? 유튜브는 사용자에 대한 선호 이력이 없습니다. 사용자 상호작용을 신호로 사용하여 더 개인화된 추천 시스템을 구축하는 방법이 있습니다. 사용자가 시청, 좋아요를 누르거나 공유한 동영상과 사용자가 게시한 댓글과 같은 데이터는 모두 실시간 머신 러닝 모델에 피드로 제공되어 즉시 추천 동영상 목록을 생성하는 데 유용한 특징이 됩니다. 사용자가 행동을 변경함에 따라 모델도 이에 따라 반응하며 추천을 업데이트할 수 있습니다. TikTok은 사용자의 선호를 몇 시간 내로 이해하는데 성공하며 큰 성공을 이루었습니다 (  https://newsroom.tiktok.com/en-us/how- tiktok-recommends-videos-for-you). 처음 방문한 사용자들의 경우 인기 동영상이나 기타 지표를 기반으로 목록을 시작점으로 사용하고, 사용자 상호작용에 따라 목록을 변경하는 방법을 학습할 수 있습니다.

또한, 실시간 머신 러닝 모델은 더 응답성이 뛰어나며 갑작스러운 변화나 사용자 활동의 급증에 잘 적응할 수 있습니다. 예를 들어, 바이랄 뉴스 이벤트 발생 시 모든 사용자가 이 뉴스 관련 콘텐츠를 보고 싶어 한다면, 배치 데이터로 학습된 모델은 이 이벤트에 대한 정보를 가지고 있지 않기 때문에 관련 콘텐츠가 사용자 피드 상단에 표시되지 못합니다. 하지만 실시간 머신 러닝 알고리즘은 이를 빠르게 탐지하고 이에 맞춰 반응할 수 있습니다. 이를 통해 추천이 시의적절하고 효과적으로 이루어질 수 있으며, 그 결과 사용자 만족과 참여도가 높아집니다.

실시간 머신 러닝 모델은 시간, 위치, 기기 유형, 최근 브라우저 기록과 같은 실시간 맥락 정보를 특징으로 활용하여 더 나은 모델을 개발하는 데에 더 적합합니다. 또한, 온라인 플랫폼은 즉각적인 사용자 피드백(클릭, 좋아요, 공유, 체류 시간) 등을 사용하여 여러 실험을 동시에 수행하고 결과를 통해 추천을 업데이트할 수 있습니다. 

 

1.4.2 이상 탐지

배치 방식과 달리, 실시간 이상 탐지 알고리즘은 데이터가 도착하자마자 이상 패턴을 식별할 수 있다. 이 기능은 네트워크 트래픽 데이터, IoT 시스템, 금융 거래 등과 같이 데이터가 지속적으로 도착하고 변화하는 환경에서 특히 유용하다.

금융 거래를 예로 들어보자. 온라인으로 진행되는 금융 거래의 수가 꾸준히 증가하고 있다. 점점 더 많은 사람들이 지역 은행에서 수표를 예금하고 현금으로 물건과 서비스를 지불하지 않는다. 실제로 많은 물건이 물리적 상점 대신 온라인에서 구매되고 있다. 이 때문에 모든 것이 컴퓨터 뒤에서 이루어지므로 사기 행위의 가능성이 다양하게 열린다.

이 분야에서는 특히 이상 탐지 알고리즘을 사용하여 비정상적인 거래를 감지하는 데 여러 가지 발전이 이루어졌다. 예를 들어, 누군가가 당신의 신용카드를 훔치거나 온라인에서 당신의 신용카드 정보를 획득할 수 있다면, 그들은 당신의 신용카드를 사용하여 구매를 할 수 있다. 이상 탐지 알고리즘은 이들 구매의 특성 차이를 감지하고 이를 사기로 표시하는 데 매우 성공적이다.

그러나 사기 패턴은 계속 변화하고 사기꾼들은 과거에 사용했던 전략이 더 이상 작동하지 않는다는 것을 알게 되면 즉시 전략을 바꾼다. 이 모든 것을 악화시키는 것은 수집되는 데이터의 양이다. 이것은 배치 학습 모델을 사용하는 시스템에 부담을 주는데, 이 모델은 변화에 따라 더 자주 재훈련되어야 한다. 다행히도, 실시간 머신 러닝의 발전으로 데이터가 도착할 때 점진적으로 학습할 수 있는 이상 탐지 알고리즘이 있으며, 사용자 구매 이력에 대한 지식을 가질 뿐만 아니라 들어오는 데이터 스트림의 변화에 신속히 적응할 수 있다. 이는 이상치를 빠르게 반응하고 너무 늦기 전에 이를 감지할 수 있음을 의미한다. 금융 거래의 경우, 속도가 중요하다. 왜냐하면 사기 거래를 빠르게 탐지하고 처리함으로써 금융 기관이 손실을 최소화할 수 있기 때문이다.

사기 탐지 외에도 실시간 이상 탐지는 거래 및 투자에 잘 맞다. 주식 시장은 수 밀리초 단위로 가격 변동을 경험하며, 배치 데이터로 훈련된 거래 모델은 시장 상황의 변화에 ​​대응하는 데 어려움을 겪는다. 온라인 머신 러닝 모델은 뉴스, 경제 지표, 가격 데이터와 같은 여러 출처에서 실시간 데이터를 수집하여 최신 트렌드를 반영하는 가격 및 투자 전략을 최적화할 수 있는 능력을 갖추고 있다.

 

사이버 보안 분야에서는 실시간 이상 감지 알고리즘이 네트워크 트래픽 로그 또는 시스템 동작 패턴을 분석하는 데 더 적합하다. 이는 실시간으로 탐지 능력을 조정할 수 있기 때문이다. 위협 행위자들은 지속적으로 그들의 전술과 기술을 발전시키기 때문에 이는 중요하다. 이상 감지 모델은 수백만 개의 데이터 패킷을 지속적으로 분석하여 정상적인 동작 패턴에서의 편차를 감지하고 즉시 경보를 발생시킴으로써 잠재적 피해가 확산되기 전에 완화할 수 있다. 네트워크 로그 모니터링 외에도, 이상 감지 알고리즘은 엔드포인트 동작, 애플리케이션 사용 패턴 및 시스템 구성을 모니터링하는 데 사용될 수 있다. 이를 한 단계 더 발전시켜 이러한 모델을 사이버 보안 운영을 자동화하는 프로세스의 일부로 통합할 수 있다. 예를 들어, 실시간 침입 탐지 시스템(ID: Intrusion Detection System)은 의심스러운 동작이 감지되었을 때 경고를 보낼 뿐만 아니라, 의심스러운 IP 주소를 자동으로 차단하거나 실시간으로 손상된 장치를 격리하여 사건 대응 시간을 단축하고 대규모 공격을 예방한다. 요컨대 실시간 기계 학습은 사이버 보안 전문가와 사이버 범죄자 간의 경쟁 구도를 균형 있게 만들어 줄 수 있다.

의료 분야에서는 이상 감지 알고리즘이 실시간 모니터링 및 조기 감지에 사용될 수 있다. 많은 사람들이 심박수, 신체 활동, 혈압, 혈당 및 기타 건강 지표를 모니터링하는 웨어러블 기기를 사용하고 있다. 이러한 기기들은 실시간으로 데이터 스트림을 생성하며, 이는 실시간 이상 감지 알고리즘에 의해 처리 및 분석되어 비정상 심박, 환자 상태의 예상치 못한 변화 및 일반 건강 패턴에서의 편차를 감지한다. 이러한 알고리즘은 기기에서 들어오는 새로운 정보를 수용함에 따라 지속적으로 적응하고 발전할 수 있다.

건강 관련 데이터 모니터링 외에도 이상 감지 알고리즘은 병원 입원, 침대 점유율, 약물 사용 등의 데이터를 지속적으로 모니터링하여 운영 효율성을 향상하고 자원 배분, 일정 관리 및 환자 간호를 최적화하는 데 사용할 수 있다.

실시간 이상 감지 알고리즘은 IoT 시스템 모니터링에 특히 적합하다. 상호 연결된 기기와 센서의 증가로 인해 이러한 시스템이 적절하게 최적의 상태로 작동하는 것이 중요하다. 온도, 습도, 에너지 소비와 같은 지표를 모니터링하여 정상 패턴에서의 편차를 감지하고 잠재적 고장, 침입 또는 성능 저하를 감지할 수 있다. 이러한 알고리즘의 실시간 특성은 시스템 전체의 치명적인 실패를 방지하고, 장비 수명을 연장하며, 가동 중단 시간을 줄임으로써 운영 효율성을 개선하고 유지 보수 비용을 최소화할 수 있게 보장한다.

IoT 장치는 종종 처리 능력과 대역폭이 제한된 리소스 제약 엣지 컴퓨팅 환경에서 운영된다. 실시간 모델은 엣지에서 경량 이상 감지 작업을 로컬로 수행하여 지연을 줄이고 네트워크 대역폭을 절약할 수 있다. 이러한 모델은 중앙 집중식 클라우드 서버에 연결할 필요 없이 신속히 이상을 감지할 수 있다. 이러한 분산된 접근 방식은 확장성, 응답성 및 IoT 배치에서의 효율성을 향상 시키며, 특히 스마트 시티, 자율 차량, 원격 자산 모니터링과 같은 실시간 이상 감지가 필요한 응용 분야에서 유용하다.

 

1.4.3 강화 학습 (Reinforcement Learning)

강화 학습은 환경과의 상호 작용을 통해 학습하는 기계 학습 (Machine Learning) 분야입니다. 모델은 환경의 맥락을 사용하여 행동을 결정하고, 그 행동에 관련된 보상 또는 손실을 사용하여 환경에 적응합니다. 강화 학습은 특히 복잡한 의사 결정 작업에 유용합니다.

강화 학습의 또 다른 장점은 레이블이 있는 데이터가 필요하지 않다는 점인데, 이로 인해 모델이 더 유연하고 특히 실시간 맥락에서 적응 가능합니다. 반면에 오프라인 모델은 환경과 직접 상호 작용하지 않습니다. 대신, 역사적인 맥락과 보상을 사용하여 어떤 행동을 취할지 결정합니다. 이 접근 방식은 환경의 변화하는 조건에 대응할 수 없어 의사 결정 오류로 이어질 수 있는 단점을 가지고 있습니다.

금융 분야에서는 온라인 강화 학습을 시장 메이킹에 사용할 수 있습니다. 주식 시장에서 시장 메이커는 매수와 매도를 위한 가격을 제안하여 유동성을 유지하는 역할을 합니다. 강화 학습 알고리즘은 공급과 수요, 가격 움직임, 주문 흐름을 모델링하여 최적의 시장 메이킹 전략을 학습할 수 있습니다. 강화 학습 알고리즘은 시장에서 학습하고 입찰-요청 스프레드를 동적으로 조정하여 수익성(보상)을 극대화하고 위험(손실)을 최소화할 수 있습니다.

온라인 강화 학습은 알고리즘 트레이딩 및 포트폴리오 관리에 사용할 수도 있습니다. 강화 학습 알고리즘은 과거 데이터뿐만 아니라 현재 시장 조건으로부터 학습하여 거래 전략을 최적화할 수 있습니다. 예를 들어, 보상 함수는 시간이 지남에 따른 포트폴리오의 누적 수익이 될 수 있으며, 이러한 알고리즘은 누적 수익을 극대화할 가능성이 있는 행동을 기반으로 매수 및 매도 결정을 내릴 수 있습니다. 유사하게, 강화 학습은 가격 변동, 금리 변동과 같은 경제적 조건, 지정학적 사건과 같은 데이터를 사용하여 포트폴리오의 잠재적 위험을 헤지하기 위한 최적의 전략을 만드는 데 사용할 수 있습니다.

한 단계 더 나아가, 강화 학습 알고리즘은 인간의 피드백과 결합되어 RLHF라는 이름으로도 알려져 있습니다. RLHF는 주로 로봇 공학에서 사용되던 2008년쯤부터 존재했습니다. 최근 Open AI의 ChatGPT의 성공으로 인해 대중화되었습니다. 변환기 모델의 발명으로 언어 모델은 수년 동안 지속적인 혁신을 통해 개선되고 있습니다. 그러나 인상적인 결과에도 불구하고 모델은 항상 좋은 출력을 생성하지 못했습니다. RLHF는 인간의 피드백을 활용하여 출력 품질을 향상시키는 기술로 사용되었습니다. 목표는 텍스트 입력을 받아들여 인간의 선호도를 나타내는 스칼라 보상을 반환하는 모델을 얻는 것입니다. 이러한 보상 모델은 미세 조정된 언어 모델이거나 인간 선호 데이터로 처음부터 훈련된 언어 모델이 될 수 있습니다. 이제 텍스트로 훈련된 모델과 인간 선호 데이터로 훈련된 두 번째 모델이 있으므로, 보상 모델을 사용하여 원래 언어 모델을 미세 조정하기 위해 강화 학습을 사용할 수 있습니다.

 

RLHF는 다른 응용 프로그램에서도 사용할 수 있다. 한 가지 응용 예는 피싱과 사회 공학적 기법을 감지하는 것이다. 사람의 피드백을 사용하여 스팸 필터 및 기타 통제를 우회하는 피싱 이메일을 식별할 수 있다. 정보 분석가는 사기성 URL, 오해를 유발하는 내용 또는 사이버 범죄자들이 사용하는 사칭 전술과 같은 피싱 시도의 특성에 대해 피드백을 제공할 수 있다. 이러한 피드백은 스트리밍 애플리케이션을 통해 수집되고 강화 학습 알고리즘에 제공되어 새로운 피싱 및 사회 공학적 공격을 효과적으로 탐지하기 위한 최신 정보를 확보할 수 있다.

의료 분야에서는 RLHF가 의사 결정 지원 시스템에 사람의 피드백을 통합하여 의사 결정을 간소화하고 향상할 수 있다. 의료 전문가들은 모델이 제안하는 진단의 정확성, 치료 추천, 환자 관리 계획에 대해 피드백을 제공할 수 있다. 모델은 그 피드백을 사용하여 더 나은 추천을 제공할 수 있다.

이 장에서 실시간 데이터가 무엇인지, 그리고 시스템에 데이터가 도착함과 동시에 계속적으로 훈련하고 예측을 생성할 수 있는 온라인 모델을 구축하는 방법을 이해하였다. 실시간 기계 학습에 특히 적합한 몇 가지 사용 사례를 제시하였다. 다음 장에서는 이벤트 기반 아키텍처를 사용하여 데이터 수집 파이프라인을 구축하기 위해 실시간 데이터를 수집하는 방법을 배울 것이다.

1.5 요약

  • 실시간 데이터는 어떤 형태의 저장소에도 저장되지 않은 움직이는 데이터다. 
  • 실시간 기계 학습은 실시간 데이터를 사용하여 환경 변화에 적응하는 예측 시스템을 구축하는 접근 방식이다. 
  • 오프라인 학습은 역사의 배치 데이터로부터 모델을 훈련하는 것이다. 
  • 온라인 학습은 데이터가 시스템에 도착하는 즉시 점진적으로 모델을 훈련한다. 
  • 오프라인 학습 흐름은 동기식으로, 각 구성 요소가 이전 요소에 의존한다. 
  • 온라인 학습 흐름은 비동기식으로, 모든 구성 요소가 서로 분리되어 있다. 
  • 온라인 학습은 데이터 분포의 급격한 변화가 있는 사용 사례를 해결하기 위해 진화하였다. 
  • 특성 이동 또는 데이터 이동은 모델 훈련에 사용되는 특성의 분포가 시간에 따라 변화할 때 발생한다. 
  • 개념 이동은 입력 특성과 출력 라벨 간의 관계가 시간에 따라 변화할 때 발생한다. 
  • 실시간 기계 학습에 적합한 몇 가지 사용 사례로는 추천 시스템, 이상 탐지, 강화 학습 등이 있다.

 



[1]
광섬유 케이블의 전송 속도는 빛의 속도 c와 굴절률 n에 따라 달라진다. 여기서 단일 모드 광섬유의 n=1.46으로 가정하여 이상적인 전송 속도는 v = c / n = 300,000 km/s / 1.46 = ~205,000 km/s가 된다.

반응형

+ Recent posts