logo

AI Sales Copilot 개발 기록 - (2) Streaming Queue

작성 2025년 1월 22일

업데이트 2025년 1월 22일

이전 글 AI Sales Copilot 개발 기록 - (1) 기획하는 개발자

AI 스트리밍의 재구성

구현 단계에서 맞닥뜨린 첫 번째 문제는 AI Meeting Cheat-sheet에서 제공하는 컨텐츠가 한 페이지에 섹션순으로 스트리밍 되어야 한다는 요구사항이었다. 주제별로 프롬프트가 있으니 총 6개의 프롬프트로 요청이 이루어진다. 각 요청은 AI에 순차적으로 전송되지만 응답은 웹소켓을 통해 비동기로 스트리밍되기 때문에 순서를 보장할 수 없고 사실상 병렬 처리된다. 프론트엔드에서는 응답 데이터를 순서별로 다시 줄을 세워서 화면에 렌더링해야 한다. 즉, AI 스트리밍을 재구성해야 하는 것이다.

데이터의 흐름

AI에서 여러 개의 프롬프트 요청에 대한 스트리밍 응답을 병렬로 받고 있는 상황을 도식화하면 아래와 같다. 각 섹션별로 데이터가 랜덤으로 들어오고, 속도도 제각각이다.

Sections

도식에서 동그라미로 표현된 데이터의 한 조각에는 AI가 생성한 컨텐츠 외에도 해당 컨텐츠가 소속된 섹션 데이터가 함께 포함되어 있다. 예를 들면 아래와 같은 형태이다.

{
  "sectionKey": "HOOK_POINTS",
  "sectionStatus": "STREAMING",
  "content": {
    "title": "Seamless Communication",
    "description": "Operating in the similar timezone as Happy Client, Tsunagaru Inc. ensures effective and timely communication, facilitating smooth project execution and collaboration.",
    "factors": ["Speed of Implementation", "Technical Support"],
  }
}

여기서 sectionKey 값에 따라 content의 자료 구조는 동적으로 변한다. 이런 상황에서 데이터를 섹션 순서별로 재구성하려면, 첫 번째 섹션인 Hook Points를 제외하고는 받은 대로 렌더링하는 것이 아니라 순서에 맞춰 별도로 저장해 두었다가 렌더링해야 한다. 하나의 데이터 조각이 들어오는 시점에 sectionKey와 매칭되는 queue map에 content를 쌓아두고, 실제 사용자가 보는 화면에 렌더링할 때는 섹션 순서별로 queue를 실행함으로써 순서를 보장하는 것이다. 전체 섹션이 정렬되는 상위 레벨의 queue를 Processed Snapshot이라고 이름지었는데, Snapshot은 Yess 웹앱에서 실시간 동시편집을 다룰 때 특정 시점에 저장된 데이터의 상태를 의미한다. 이를 도식화하면 아래와 같은 모습이 된다. (이미지를 클릭하면 원본 열람 가능)

Processed Snapshot

rxjs로 queue 구현하기

위의 도식에서 RS라고 표시된 것은 해당 queue가 하나의 ReplaySubject와 매칭됨을 의미한다. ReplaySubjectrxjs에서 제공하는 Observable의 한 형태이다. Observable은 구독을 통해 파이프 스트림 형태의 로직을 수행하는 API인데, 그 중에서도 ReplaySubject는 First-In-First-Out(FIFO) 방식으로 값을 multicast하는 특징이 있어서 스트리밍 컨텐츠에 대한 queue를 구현하기에 적합하다. 구현된 queue 모듈을 간단하게 재현하면 아래 코드와 같다.

type SectionKey = 'HOOK_POINTS' | 'OFFERS' | 'AGENDA' | 'QUESTIONS' | 'RISKS' | 'TERMS';
type SectionStatus = 'NOT_STARTED' | 'STREAMING' | 'COMPLETE' | 'UNHEALTHY';
type SectionQueue = ReplaySubject<Delta>; // Delta는 데이터 조각의 명칭

class StreamingQueue {
  /**
   * sectionKey별로 섹션의 queue를 저장하는 맵
   */
  private sectionQueueMap: Map<SectionKey, SectionQueue> = new Map();

  /**
   * 전체 섹션을 순서에 따라 재구성된 형태로 저장하는 상위 레벨의 queue
   */
  private processedSnapshotQueue: ReplaySubject<SectionQueue> = new ReplaySubject();

  /**
   * 인스턴스 생성 시점에 sectionKey 배열을 인자로 받아 sectionQueueMap과 processedSnapshotQueue를 초기화한다.
   */
  constructor({ sectionKeys }: { sectionKeys: SectionKey[] }) {
    sectionKeys.forEach((sectionKey) => {
      const sectionQueue = new ReplaySubject<Delta>();
      this.sectionQueueMap.set(sectionKey, sectionQueue);
      this.processedSnapshotQueue.next(sectionQueue);
    });
  }

  /**
   * 해당되는 sectionKey에 매칭되는 queue를 업데이트한다.
   * 새로운 데이터 조각이 들어온 시점에 실행한다.
   */
  public enqueue({
    sectionKey,
    delta,
  }: {
    sectionKey: SectionKey;
    delta: Delta;
  }): void {
    const targetsectionQueue = this.sectionQueueMap.get(sectionKey);

    if (!targetsectionQueue) {
      throw new Error(`no matched section queue for ${sectionKey}`);
    }

    targetsectionQueue.next(delta);
  }

  /**
   * 해당되는 sectionKey에 매칭되는 queue를 종료 처리한다.
   * 섹션의 스트리밍 상태가 완료(또는 에러)로 바뀌면 실행한다.
   */
  public completeSectionQueue(sectionKey: SectionKey): void {
    const targetSectionQueue = this.sectionQueueMap.get(sectionKey);

    if (!targetSectionQueue) {
      throw new Error(`no matched section queue for ${sectionKey}`);
    }

    targetSectionQueue.complete();
  }

  /**
   * 사용자 화면에 컨텐츠를 렌더링하기 위해 queue를 실행한다.
   * processedSnapshotQueue > sectionQueue 순서로 파이프 스트림이 흘러간다.
   */
  public async subscribe(
    callback: (delta: Delta) => Promise<void>,
  ): Promise<void> {
    from(this.processedSnapshotQueue)
      .pipe(
        concatMap((sectionQueue) => // 병합 연산자 중 순서가 보장되는 concatMap을 사용한다.
          sectionQueue.pipe(
            bufferCount(1), // processedSnapshotQueue 에서 sectionQueue 를 1개씩 처리하기 위해 사용한다.
            concatMap((deltas) =>
              from(deltas).pipe(
                bufferCount(1), // sectionQueue 에서 delta 를 1개씩 처리하기 위해 사용한다.
                delay(100), // 유저에게 스트리밍 효과를 주기 위해 100ms 대기한다.
                concatMap(([delta]) => callback(delta)), // 화면에 렌더링하기 위한 로직을 실행한다.
              ),
            ),
          ),
        ),
      )
      .subscribe();
  }

  /**
   * 전체 queue를 종료 처리한다.
   * 모든 섹션의 스트리밍 상태가 완료(또는 에러)로 바뀌면 실행한다.
   */
  public unsubscribe(): void {
    this.sectionQueueMap.forEach((sectionQueue) => {
      sectionQueue.complete();
      sectionQueue.unsubscribe();
    });
    this.processedSnapshotQueue.complete();
    this.processedSnapshotQueue.unsubscribe();
  }
}

해당 코드에서 subscribe 로직 안에 있는 callback 호출이 실제로 화면에 데이터를 렌더링하기 위한 부분이다. 주석에 적힌 것처럼 Delta는 Yess 웹앱에서 실시간 동시 편집 데이터를 다룰 때 사용하는 자료 구조의 명칭인데, 이 Delta에 있는 컨텐츠를 참고하여 렌더링한다.

queue 적용하기

구현된 queue 모듈은 기존의 프론트엔드 코드베이스의 컨벤션에 맞춰서 React Context API와 조합하여 적용하였다. 페이지 단위에 씌워진 StreamingContext에서는 실시간 동시 편집 데이터를 다루는 Delta 엔진에 이벤트 핸들러를 등록한다. Delta 엔진에서 스냅샷 버전이 업데이트될 때마다 발생하는 이벤트를 통해서 Delta의 내용 중 어떤 섹션이 변경되었는지, 섹션의 스트리밍 상태는 어떤지 등을 확인할 수 있다. 이 정보는 queue를 구성하고 실행하는 데 활용된다. 앞서 StreamingQueue를 재현한 코드 스니펫에 주석으로 달아놓은 내용과 같이, 각 메소드는 특정 시점에 호출하도록 구현되어 있다. 이를 간단하게 코드로 재현하면 다음과 같다.

type StreamingContextValue = {
  snapshot: Delta;
}

const StreamingContext = React.createContext<StreamingContextValue | null>(null);

function StreamingContextProvider({ children }: Props) {
  const [snapshot, setSnapshot] = useState<Delta>(new Delta()); // UI에서 참조할 snapshot 상태
  const streamingQueue = React.useRef<StreamingQueue | null>(null);

  /**
   * queue 구독
   */
  function subscribeStreamingQueue() {
    if (!streamingQueue.current) return;

    streamingQueue.current.subscribe(async (delta) => {
      // 델타로부터 업데이트된 섹션을 추출
      const section = getUpdatedSection(delta, snapshot);

      if (['STREAMING', 'COMPLETE'].includes(section.status)) {
        // 렌더링할 스냅샷을 새로운 델타로 교체
        setSnapshot(delta);
      }

      if (section.status === 'COMPLETE') {
        // sectionQueue 완료 처리
        streamingQueue.current.completeSectionQueue(section.key);
      }

      if (section.status === 'UNHEALTHY') {
        // queue 구독 해제 및 초기화
        streamingQueue.current.unsubscribe();
        streamingQueue.current = null;
      }
    });
  }

  /**
   * 이벤트 리스너 등록
   */
  function addDeltaEventListener() {
    if (!streamingQueue.current) return;

    // 델타 이벤트를 전달하는 엔진의 tune 인스턴스 가져오기
    const tune = getTune();
    
    tune.addEventListener('VERSION_UPDATED', (delta) => {
      // 델타로부터 업데이트된 섹션을 추출
      const section = getUpdatedSection(delta, snapshot);

      // 새로운 데이터가 들어왔을 때 queue에 추가
      streamingQueue.current.enqueue({
        sectionKey: section.key,
        delta,
      });

      if (['UNHEALTHY', 'COMPLETE'].includes(delta.streamingStatus)) {
        // queue 구독 해제 및 초기화
        streamingQueue.current.unsubscribe();
        streamingQueue.current = null;
      }
    });
  }

  /**
   * 마운트 후 최초 한번 다음 로직들을 실행한다.
   */
  useEffect(() => {
    streamingQueue.current = new StreamingQueue({ sectionKeys });

    subscribeStreamingQueue();
    addDeltaEventListener();
  }, []);

  return (
    <StreamingContext.Provider value={{ snapshot }}>
      {children}
    </StreamingContext.Provider>
  );
}

최종적으로는 실제 UI를 그리는 컴포넌트 레벨에서 StreamingContextsnapshot을 참조하여 렌더링하게 된다. 실제 코드에서는 Delta로부터 snapshot을 구성하는 로직과 get- 프리픽스가 붙은 함수 내부의 각종 복잡한 처리가 포함되어 있지만, 중요한 뼈대에 해당되는 부분만 간추려 보았다. (참고로 Yess 웹앱이 AI 스트리밍을 구현하기 위해 사용하는 Delta에 대해 보다 자세한 내용을 다음 포스트에서 다룰 예정이다.)

목표에 부합하지 않는 부분은 타협하기

안타깝게도 해당 queue 모듈은 설계하고 구현 및 테스트하는 데 시간이 꽤 많이 소요된 것에 비해서 실제 동작을 확인했을 때 장점이 크지 않았다. 앞서 데이터의 흐름을 살펴보며 첨부한 이미지를 보면 queue를 통한 AI 스트리밍 재구성에는 치명적인 문제가 있다. 이미 받은 응답을 인위적으로 queue에 쌓아두었다가 뒤늦게 렌더링하기 때문에 컨텐츠가 완성되는 시점이 실제 응답 완료 시점보다 훨씬 뒤로 밀린다는 것이다. 만약 들어오는 그대로 렌더링한다면 Sections 도식의 선 길이만큼의 시간만 걸리지만 Processed Snapshot 도식의 선 길이는 섹션 개수만큼 배가된 시간이 걸린다. 또한 중간에 특정 데이터 조각에서 에러가 발생하는 경우 해당 섹션보다 뒷 순서에 있는 컨텐츠는 정상적으로 들어왔더라도 사용자에게 전달되지 않는다.

Yess 팀의 목표는 고객이 AI Meeting Cheat-sheet의 가치를 최대한 빠르게 발견하도록 하는 것이다. 이를 고려했을 때 완성되는 시점이 늦어지는 것은 목표에 부합하지 않기 때문에 UI/UX를 타협하기로 했다. 결국 한 페이지에 순차적으로 섹션을 배치하는 것이 아니라 섹션 별로 탭을 나누고, 병렬로 응답이 들어오는 동시에 탭에도 반영하는 방식으로 변경하였다. (AI Meeting Cheat-sheet 데모 참고) 타협한다는 말에 부정적인 느낌이 있지만 오히려 기존의 선택이 적절하지 않았을 경우 빠르게 시정할 수 있는 기회로 볼 수도 있다. 변경된 이후에는 탭별로 컨텐츠가 나뉘어 있어 보다 간결해 보이고, 탭별 스트리밍 상태를 별도 팝업으로 안내하면서 사용자가 진행 정도를 직관적으로 파악할 수 있게 되었다.

탭별 스트리밍 상태 팝업|260

이 시점에 깨달은 것은 AI를 다룰 때는 UX 목표를 보다 구체적으로 정의하는 것이 중요하다는 점이다. 이전 포스트에 적었듯이 타이핑 애니메이션의 명분을 먼저 확인했던 것처럼, 필요한 리소스 대비 얻고자 하는 고객 경험의 정당성이 어느 정도인지 파악해야 한다. 그러나 여기서 무게중심을 두어야 할 부분은 리소스보다는 고객 경험이어야 할 것이다. 예를 들어 자연스러운 대화의 느낌을 주는 것이 목표라면 타이핑 애니메이션이 좋은 방법일 수 있지만, 완성된 컨텐츠를 최대한 빠르게 전달하는 것이 목표라면 성능 최적화에 더 신경을 쓰는 것이 나을 수도 있다. 개발이 수단인 것과 마찬가지로 AI도 수단에 불과하다.

다음 글 AI Sales Copilot 개발 기록 - (3) Delta-Driven UI