Airflow AI SDK, Apache Airflow에 LLM 활용을 위한 SDK

Airflow AI SDK 소개

최근 AI와 머신러닝 분야에서 대규모 언어 모델(LLM)의 활용이 급증하고 있습니다. 이러한 모델을 데이터 파이프라인에 통합하여 자동화하고자 하는 요구가 높아지는 가운데, Apache Airflow에서 LLM과 AI 에이전트를 손쉽게 활용할 수 있는 SDK가 공개되었습니다. Apache Airflow는 데이터 파이프라인의 스케줄링과 모니터링을 위한 오픈 소스 플랫폼으로, 유연한 스케줄링, 동적 태스크 매핑, 분기 및 조건 로직, 오류 처리, 리소스 관리, 모니터링, 확장성 등 다양한 기능을 제공합니다.

최근 Astronomer에서 공개한 Airflow AI SDK는 이러한 Airflow의 기능을 확장하여, LLM과 AI 에이전트를 파이프라인에 직접 통합할 수 있도록 지원합니다. 이를 통해 개발자와 데이터 엔지니어는 복잡한 AI 워크플로우를 보다 효율적으로 구축하고 관리할 수 있습니다.

기존에는 LLM과 AI 에이전트를 활용하기 위해 별도의 스크립트나 외부 도구를 사용해야 했습니다. 그러나 Airflow AI SDK를 활용하면 Airflow의 태스크 데코레이터를 통해 이러한 기능을 직접 통합할 수 있어, 워크플로우의 일관성과 관리 효율성이 향상됩니다.

Airflow AI SDK의 주요 기능

  • @task.llm 데코레이터: LLM 호출을 위한 태스크를 정의할 수 있습니다.
  • @task.agent 데코레이터: AI 에이전트와 상호작용하는 태스크를 생성할 수 있습니다.
  • @task.llm_branch 데코레이터: LLM의 출력에 따라 DAG의 흐름을 동적으로 제어할 수 있습니다.

사용 방법

Airflow AI SDK는 Pydantic AI를 기반으로 구축되었으며, Airflow 파이프라인에서 LLM과 AI 에이전트를 손쉽게 활용할 수 있도록 설계되었습니다. 예를 들어, GitHub의 커밋 로그를 요약하는 워크플로우는 다음과 같이 구성할 수 있습니다:

import os
import pendulum
from airflow.decorators import dag, task
from github import Github

@task
def get_recent_commits(data_interval_start: pendulum.DateTime, data_interval_end: pendulum.DateTime) -> list[str]:
    gh = Github(os.getenv("GITHUB_TOKEN"))
    repo = gh.get_repo("apache/airflow")
    commits = repo.get_commits(since=data_interval_start, until=data_interval_end)
    return [f"{commit.commit.sha}: {commit.commit.message}" for commit in commits]

@task.llm(
    model="gpt-4o-mini",
    system_prompt="""
    Your job is to summarize the commits to the Airflow project given a week's worth
    of commits. Pay particular attention to large changes and new features as opposed
    to bug fixes and minor changes.
    """
)
def summarize_commits(commits: list[str]) -> str:
    return "\n".join(commits)

@dag(schedule_interval="0 0 * * 0", start_date=pendulum.datetime(2023, 1, 1, tz="UTC"), catchup=False)
def github_changelog():
    commits = get_recent_commits()
    summary = summarize_commits(commits)
    print(summary)

dag = github_changelog()

이러한 기능을 활용하여 다양한 AI 워크플로우를 구축할 수 있습니다.

라이선스

Airflow AI SDK 프로젝트는 Apache-2.0 라이선스로 공개 및 배포되고 있습니다.

:scroll: Airflow AI SDK 소개 블로그 포스트

:github: Airflow AI SDK GitHub 저장소

:github: Airflow AI SDK 예제 저장소




이 글은 GPT 모델로 정리한 글을 바탕으로 한 것으로, 원문의 내용 또는 의도와 다르게 정리된 내용이 있을 수 있습니다. 관심있는 내용이시라면 원문도 함께 참고해주세요! 읽으시면서 어색하거나 잘못된 내용을 발견하시면 덧글로 알려주시기를 부탁드립니다. :hugs:

:pytorch:파이토치 한국 사용자 모임:south_korea:이 정리한 이 글이 유용하셨나요? 회원으로 가입하시면 주요 글들을 이메일:love_letter:로 보내드립니다! (기본은 Weekly지만 Daily로 변경도 가능합니다.)

:wrapped_gift: 아래:down_right_arrow:쪽에 좋아요:+1:를 눌러주시면 새로운 소식들을 정리하고 공유하는데 힘이 됩니다~ :star_struck: