CocoIndex: 실시간 증분 데이터 파이프라인을 위한 AI 프레임워크

CocoIndex 소개

LLM을 활용한 AI 애플리케이션이 확산되면서, 실시간으로 최신 데이터를 반영할 수 있는 데이터 파이프라인의 중요성이 높아졌습니다.

CocoIndex는 기존의 배치 기반 ETL이나 단순한 SQL 뷰 방식과 달리, 증분 처리, 자동 인덱싱, LLM 통합 등을 지원하는 차세대 오픈소스 프레임워크입니다. Rust 기반 고성능 엔진을 바탕으로 Python과 TypeScript에서 쉽게 사용할 수 있어, AI/ML 파이프라인을 구축하는 모든 개발자에게 도움이 될만한 프로젝트입나다.

image

CocoIndex는 변경된 데이터만 추적하고 업데이트하는 증분 처리 기반의 데이터 변환 프레임워크입니다. AI에서 자주 활용되는 텍스트 문서, 데이터베이스, 외부 API 등의 데이터를 실시간으로 처리하고, 벡터/그래프 인덱스 등 다양한 스토리지로 자동 연동할 수 있게 해줍니다.

CocoIndex의 주요 구성 요소 및 특징

CocoIndex는 데이터 흐름을 효율적으로 구성하고 실행하기 위한 선언형 프레임워크입니다. 각 구성 요소는 AI 기반 파이프라인을 손쉽게 구축할 수 있도록 설계되어 있으며, 증분 처리, 자동화된 인덱싱, LLM 연동 등 다양한 기능을 조화롭게 통합할 수 있습니다.

1. Flow 정의: 전체 파이프라인의 중심 구조

CocoIndex의 모든 작업은 @cocoindex.flow_def 데코레이터로 정의된 Flow 함수 안에서 이루어집니다. 이 Flow는 데이터의 입력부터 출력까지 전체 흐름을 표현하는 선언형 단위입니다. Flow 내부에서는 소스 등록, 데이터 처리, 수집 및 저장을 체계적으로 구현할 수 있습니다.

@cocoindex.flow_def(name="ExampleFlow")
def example_flow(flow, data):
    ...

Flow 이름은 내부적으로 DAG 그래프를 구성할 때 식별자로 사용되며, CLI 도구에서도 해당 이름을 기반으로 관리됩니다.

2. 데이터 소스 추가: flow.add_source

Flow 내부에서는 flow.add_source()를 통해 입력 데이터를 선언합니다. 이 함수는 다양한 소스를 지원하며, 텍스트 문서, JSON 파일, API, S3, SQS 등 다양한 형태의 입력을 추상화합니다.

예를 들어, 로컬의 마크다운 파일을 소스로 사용할 경우 다음과 같이 정의할 수 있습니다:

data["docs"] = flow.add_source(
    cocoindex.sources.LocalFile(path="docs", file_extension=".md"))

소스 항목은 data["docs"]와 같이 키-값 형태로 사용되며, 이후 연산에서 기준이 되는 테이블 역할을 합니다.

3. 행 단위 처리: with ... row() as ...

데이터 소스는 일반적인 테이블(row 기반) 구조로 처리됩니다. 행 단위의 연산을 위해 with data["docs"].row() as doc: 구문을 사용하여 개별 항목을 순회하고, 필요한 변환이나 수집 작업을 수행합니다.

with data["docs"].row() as doc:
    doc["chunks"] = doc["content"].transform(...)

이 구문은 SQL의 SELECT ... FROM ...과 유사한 개념으로, 개별 항목에 대해 변환 함수나 후속 연산을 적용합니다.

4. 데이터 변환: transform() 메서드

transform()은 CocoIndex에서 데이터를 처리하는 기본 수단입니다. 이 메서드는 입력값을 받아, LLM 기반 처리나 일반적인 함수 처리 등을 적용하여 새로운 데이터를 생성합니다.

예를 들어 텍스트를 특정 길이로 분할하고 임베딩하는 경우 다음과 같습니다:

doc["chunks"] = doc["content"].transform(
    cocoindex.functions.SplitRecursively(), chunk_size=512)

chunk["embedding"] = chunk["text"].transform(
    cocoindex.functions.SentenceTransformerEmbed(
        model="sentence-transformers/all-MiniLM-L6-v2"))

이때 내부적으로는 변환 DAG가 생성되며, 데이터 변경 시 이 DAG을 따라 필요한 부분만 재계산하게 됩니다. 이 덕분에 전체 재처리 없이 변경된 부분만 빠르게 업데이트할 수 있습니다.

5. 결과 수집: add_collector()collect()

처리된 데이터를 저장하기 위해 data.add_collector()를 통해 수집기(Collector)를 선언합니다. 이후 .collect() 메서드를 통해 저장할 레코드를 지정할 수 있습니다.

collector = data.add_collector()

with chunk.row() as ch:
    collector.collect(filename=doc["filename"], text=ch["text"], embedding=ch["embedding"])

수집기는 Flow의 최종 출력을 담당하며, export를 통해 외부 DB로 저장되기 전 중간 상태를 구성합니다.

6. 결과 내보내기: export()

Collector는 .export() 메서드를 사용하여 데이터를 Postgres, Qdrant, Neo4j 등 다양한 타깃 스토리지로 보낼 수 있습니다. 이때 자동으로 스키마가 생성되고, 필요 시 벡터 인덱스도 자동 설정됩니다.

collector.export(
    "embeddings",
    cocoindex.targets.Postgres(),
    primary_key_fields=["filename", "location"],
    vector_indexes=[cocoindex.VectorIndexDef(field_name="embedding")]
)

이 선언만으로도 스키마 생성, 인덱스 설정, 데이터 삽입까지 모두 자동으로 처리할 수 있습니다.

7. LLM 연동 및 커스텀 함수

CocoIndex는 LLM 기반 작업을 함수형으로 통합할 수 있는 다양한 방법을 제공합니다. 대표적으로 OpenAI API 기반의 텍스트 요약, JSON 구조화, 분류 작업 등이 내장되어 있으며, 사용자는 Python 함수 또는 FunctionSpec/FunctionExecutor 방식으로 자신만의 커스텀 함수를 정의할 수도 있습니다.

@cocoindex.function_def(name="CustomUpperCase")
def uppercase_fn(text: str) -> str:
    return text.upper()

이렇게 정의된 함수도 .transform() 내부에서 바로 사용할 수 있으며, 다른 변환 함수와 동일한 증분 처리 흐름에 포함됩니다.

8. CLI 및 라이브러리 실행

Flow 정의는 CLI를 통해 간단하게 실행할 수 있으며, 아래 명령어들이 기본적으로 제공됩니다:

# 스키마 및 상태 초기화
cocoindex setup yourflow.py

# 변경된 데이터에 대한 증분 처리
cocoindex update yourflow.py

# 저장된 결과의 검색 정확도 등 평가
cocoindex evaluate yourflow.py --output-dir ./eval_result

라이브러리 모드에서는 FlowLiveUpdater, evaluate_and_dump() 등 고급 API를 사용하여 커스텀 통합도 가능합니다.

라이선스

CocoIndex 프로젝트는 Apache-2.0 License로 배포되며, 상업적 이용에 제약이 없습니다.

:house: CocoIndex 홈페이지

:github: CocoIndex 프로젝트 GitHub 저장소

:books: CocoIndex 공식 문서




이 글은 GPT 모델로 정리한 글을 바탕으로 한 것으로, 원문의 내용 또는 의도와 다르게 정리된 내용이 있을 수 있습니다. 관심있는 내용이시라면 원문도 함께 참고해주세요! 읽으시면서 어색하거나 잘못된 내용을 발견하시면 덧글로 알려주시기를 부탁드립니다. :hugs:

:pytorch:파이토치 한국 사용자 모임:south_korea:이 정리한 이 글이 유용하셨나요? 회원으로 가입하시면 주요 글들을 이메일:love_letter:로 보내드립니다! (기본은 Weekly지만 Daily로 변경도 가능합니다.)

:wrapped_gift: 아래:down_right_arrow:쪽에 좋아요:+1:를 눌러주시면 새로운 소식들을 정리하고 공유하는데 힘이 됩니다~ :star_struck: