들어가며
MCP (Model Context Protocol) Server를 개발하면서 Databricks SQL로 대용량 데이터셋을 쿼리해야 하는 요구사항이 들어왔습니다. 자연스럽게 공식 Databricks Node.js SDK를 사용하려 했지만, 곧 문제에 부딪혔습니다. 대용량 결과를 처리할 방법이 없었습니다.
이 글에서는 이 문제를 해결하기 위해 두 개의 라이브러리(@bitofsky/merge-streams, @bitofsky/databricks-sql)를 개발한 과정과, 그 과정에서 얻은 인사이트를 공유하고자 합니다.
문제: SDK External Links 미지원
SDK에선 fetchChunk로 대량 result set을 부분적으로 처리하도록 callback을 제공하지만, 대량 데이터를 MCP Client로 이렇게 내려줄 수는 없습니다. 이를 위해선 External Links가 필요하지만 이는 SDK에서 지원되지 않으며 API에서만 사용이 가능합니다.
API INLINE vs EXTERNAL_LINKS
Databricks Statement Execution API는 쿼리 결과를 반환하는 두 가지 방식을 제공합니다:
INLINE 방식
// 전체 결과가 응답에 포함됨
{
"result": {
"data_array": [
[1, "Alice", 30],
[2, "Bob", 25],
// ... 모든 행
]
}
}제한사항: 최대 25MB까지만 지원
이 방식의 문제는 명확합니다:
- 25MB 이상의 결과는 받을 수 없음
EXTERNAL_LINKS 방식
// 결과가 여러 청크로 분할되어 presigned URL로 제공됨
{
"result": {
"external_links": [
{ "external_link": "https://s3.../chunk_0.json", ... },
{ "external_link": "https://s3.../chunk_1.json", ... },
// ... 수십~수백 개의 청크
]
}
}제한사항: 최대 100GB까지 지원 (INLINE의 4000배!)
이 방식의 장점:
- 대용량 결과 처리 가능 (최대 100GB)
- 메모리 효율적 (스트리밍 처리)
- 각 청크를 병렬로 다운로드 가능
선택의 기로
두 가지 선택지가 있었습니다:
- SDK를 사용하고 큰 Result 결과를 포기
- Databricks API를 직접 구현하여 External Links 사용
MCP Server의 요구사항을 생각해보니:
- 수백만 건의 레코드를 다뤄야 함
- Databricks가 지원하는 최대 100GB까지 반환 가능하게 하고 싶음
선택은 명확했습니다. Databricks API를 사용한 라이브러리를 직접 구현하기로 결정했습니다.
직접 구현된 라이브러리를 사용하면서 공식 SDK도 병행하게 되면 유지보수가 복잡해질 수 있기 때문에, 라이브러리는 공식 SDK의 거의 모든 기능을 대체하도록 설계했습니다.
- 쿼리 실행 및 Statement 상태 추적
- 결과 스트리밍
- 에러 처리 & 자동 재시도
- Result Set 파싱
새로운 문제: 청크 지옥
Databricks API를 직접 구현하니 External Links를 사용할 수 있게 되었습니다. 하지만 새로운 문제가 나타났습니다.
chunk_0.arrow (presigned URL)
chunk_1.arrow (presigned URL)
chunk_2.arrow (presigned URL)
...
chunk_89.arrow (presigned URL)100만 건 쿼리 결과가 90개의 청크로 쪼개져 왔습니다. 이제 무엇을 해야 할까요?
- 90개의 URL을 클라이언트에 그대로 전달?
- 90개의 Presigned URL이 LLM Context를 순식간에 고갈시킴
- 클라이언트가 90번의 HTTP 요청을 해야 함
- 각 청크를 순서대로 올바르게 병합하거나 고려하여 쿼리해야 함
- 포맷별 처리 로직 필요 (CSV 헤더? JSON 배열? Arrow EOS 마커?)
- 서버에서 순서와 포멧을 유지한 올바른 병합 후 단일 URL 제공?
- 훨씬 나은 사용자 경험
- 하지만 직접 구현해야 함
명백히 두 번째가 더 나은 선택이었습니다.
또 다른 문제: 압축 미지원
서버에서 병합하기로 결정했지만, 곧 또 다른 제약을 발견했습니다. Databricks의 External Links는 압축을 지원하지 않습니다.
External Links는 최대 100GB까지 지원하지만, 모두 압축되지 않은 상태입니다:
chunk_0.csv (20MB, 압축되지 않음)
chunk_1.csv (20MB, 압축되지 않음)
...
chunk_89.csv (20MB, 압축되지 않음)
총 크기: 1.8GB (최대 100GB까지 가능)SQL 쿼리 결과는 대부분 문자열 데이터입니다. 이런 데이터는 압축률이 매우 높습니다:
- CSV: 컬럼명 반복, 유사한 패턴
- JSON: 키 이름 반복, 공백 문자
- 실제 압축률: 평균 1/10 이하
클라이언트에게 1.8GB 파일을 다운로드하게 할 것인가, 아니면 180MB만 다운로드하게 할 것인가?
병합 과정에서 gzip 압축을 적용하면 이 문제를 해결할 수 있습니다:
- 클라이언트 다운로드 시간 10배 단축
- 네트워크 비용 90% 절감
- 더 나은 사용자 경험
리서치: 포맷별 병합 문제
청크를 단순히 연결하면 될까요? 아닙니다. 각 포맷마다 고유한 문제가 있습니다.
Databricks API는 3가지 주요 포맷을 지원합니다:
- CSV
- JSON_ARRAY
- ARROW_STREAM
각 포맷별 병합 문제를 살펴보겠습니다.
CSV: 중복 헤더 문제
# chunk_0.csv
name,age
Alice,30
Bob,25
# chunk_1.csv
name,age
Charlie,35
David,40단순 연결하면:
name,age
Alice,30
Bob,25
name,age ← 중복!
Charlie,35
David,40해결: 첫 번째 청크의 헤더만 유지하고 나머지는 스킵해야 합니다.
JSON_ARRAY: 유효하지 않은 JSON
// chunk_0.json
[{"id":1},{"id":2}]
// chunk_1.json
[{"id":3},{"id":4}]단순 연결하면:
[{"id":1},{"id":2}][{"id":3},{"id":4}] // 유효하지 않음!해결: 브래킷을 제거하고 쉼표로 연결한 뒤 전체를 배열로 감싸야 합니다.
ARROW_STREAM: EOS 마커 문제
Arrow IPC 스트림은 각 청크가 독립적인 스트림입니다:
chunk_0: [Schema][RecordBatch][RecordBatch][EOS]
chunk_1: [Schema][RecordBatch][RecordBatch][EOS]대부분의 Arrow 리더는 첫 번째 EOS 마커에서 읽기를 중단합니다.
해결: 각 청크에서 RecordBatch만 추출하여 새로운 단일 IPC 스트림으로 재인코딩해야 합니다.
해결책: 두 개의 라이브러리
이 문제를 해결하기 위해 두 개의 라이브러리를 분리하여 개발했습니다.
1. @bitofsky/merge-streams
범용 스트림 병합 라이브러리. Databricks에 종속되지 않으며, CSV, JSON_ARRAY, ARROW_STREAM 포맷의 청크를 올바르게 병합합니다.
2. @bitofsky/databricks-sql
Databricks 전용 SQL 클라이언트. Databricks API를 직접 호출하고 merge-streams를 활용하여 External Links를 처리합니다.
@bitofsky/merge-streams: 포맷 인식 스트림 병합
핵심 가치
"When Databricks gives you 90+ presigned URLs, merge them into one."
이 라이브러리는 하나의 문제에 집중합니다: 여러 청크 스트림을 포맷에 맞게 올바르게 병합하는 것.
주요 특징
1. 포맷별 병합
import { mergeStreamsFromUrls } from '@bitofsky/merge-streams'
import { createWriteStream } from 'fs'
// CSV 병합 - 자동으로 중복 헤더 제거
await mergeStreamsFromUrls('CSV', {
urls: [
'https://example.com/chunk_0.csv',
'https://example.com/chunk_1.csv',
'https://example.com/chunk_2.csv',
],
output: createWriteStream('merged.csv'),
})
// JSON 병합 - 올바른 JSON 배열 생성
await mergeStreamsFromUrls('JSON_ARRAY', {
urls: [/* ... */],
output: createWriteStream('merged.json'),
})
// Arrow 병합 - RecordBatch 재인코딩
await mergeStreamsFromUrls('ARROW_STREAM', {
urls: [/* ... */],
output: createWriteStream('merged.arrow'),
})2. 메모리 효율적 스트리밍
전체 파일을 메모리에 로드하지 않습니다. 청크별로 스트리밍 처리하므로:
테스트: 1GB 데이터 (50개 청크)
- 전체 로드 방식: ~1GB 메모리
- merge-streams: ~30MB 메모리
- 절감: 약 97%3. Pre-connection 최적화
// 현재 청크 스트리밍 중에 다음 청크 연결을 미리 시작
현재 청크: chunk_0 =====> output
다음 청크: chunk_1 [connecting...]이 최적화로 청크 간 HTTP Connection 수립에 필요한 대기 시간이 거의 제거됩니다:
순차 처리: 50개 × 200ms 대기 = 10초
Pre-connection: 첫 청크 200ms + 나머지 ~0ms = 약 1초
개선: 약 10배4. 진행 상황 추적
await mergeStreamsFromUrls('CSV', {
urls,
output,
onProgress: ({ inputIndex, totalInputs, inputedBytes, mergedBytes }) => {
const percent = ((inputIndex + 1) / totalInputs * 100).toFixed(1)
console.log(`Progress: ${percent}% (${(inputedBytes / 1024 / 1024).toFixed(2)} MB read)`)
},
})5. 취소 가능한 작업
const controller = new AbortController()
setTimeout(() => controller.abort(), 5000) // 5초 후 취소
await mergeStreamsFromUrls('CSV', {
urls,
output,
signal: controller.signal,
})실제 사용 예: Databricks + S3 + gzip compression
import { mergeStreamsFromUrls } from '@bitofsky/merge-streams'
import { Upload } from '@aws-sdk/lib-storage'
import { createGzip } from 'zlib'
// Databricks External Links 병합 후 S3에 압축 업로드
const gzip = createGzip()
const passThrough = new PassThrough()
const upload = new Upload({
client: s3,
params: {
Bucket: 'my-bucket',
Key: 'merged-data.csv.gz',
Body: passThrough,
ContentEncoding: 'gzip',
},
})
const [ result ] = await Promise.all([
mergeStreamsFromUrls('CSV', {
urls: databricksChunkUrls,
output: gzip,
}),
pipeline(gzip, passThrough),
upload.done(),
])
// 이제 단일 S3 URL을 클라이언트에 제공
result.externalLinks[0].external_link;@bitofsky/databricks-sql: 경량 Databricks API 클라이언트
설계 목표
- 공식 SDK 없이 Databricks SQL 직접 사용
- 최적화된 폴링 메커니즘
- 메모리 효율적인 스트리밍
- External Links 완전 지원
- Query Metrics 통합
주요 기능
1. 최적화된 폴링 메커니즘
Databricks Statement Execution API는 wait_timeout 파라미터를 지원합니다. 서버에서 최대 50초까지 대기하며, 결과가 준비되면 즉시 응답합니다.
// wait_timeout=50s로 폴링 최소화
const result = await executeStatement(query, auth)
// 진행 상황을 보고 싶다면?
const result = await executeStatement(query, auth, {
wait_timeout: '0s', // 즉시 응답
onProgress: (result) => {
console.log(`State: ${result.status.state}`)
},
})2. Query Metrics 지원
Query History API를 통합하여 실시간 실행 메트릭을 제공합니다:
const result = await executeStatement(query, auth, {
enableMetrics: true,
onProgress: (result, metrics) => {
console.log(`State: ${result.status.state}`)
if (metrics) {
console.log(` Time: ${metrics.execution_time_ms}ms`)
console.log(` Rows: ${metrics.rows_produced_count}`)
console.log(` Bytes: ${metrics.read_bytes}`)
}
},
})3. 다양한 결과 소비 방식
// 1. 전체 수집 (소규모 결과)
const rows = await fetchAll(result, auth, { format: 'JSON_OBJECT' })
console.log(rows) // [{ id: 1, name: "Alice" }, ...]
// 2. 스트리밍 (대규모 결과)
await fetchRow(result, auth, {
format: 'JSON_OBJECT',
onEachRow: (row) => {
console.log(row.id, row.name)
// DB에 삽입, 파일에 쓰기 등
},
})
// 3. 바이너리 스트림 (파일 저장)
const stream = fetchStream(result, auth)
await pipeline(stream, createWriteStream('output.csv'))4. JSON_OBJECT: 스키마 기반 행 매핑
Databricks는 기본적으로 배열 형식으로 결과를 반환합니다:
// JSON_ARRAY 형식
[1, "Alice", 30, "2024-01-01T00:00:00Z"]JSON_OBJECT 옵션으로 스키마 기반 객체로 변환할 수 있습니다:
const rows = await fetchAll(result, auth, {
format: 'JSON_OBJECT',
})
// 결과
[
{
id: 1n, // BIGINT → bigint
name: "Alice", // STRING → string
age: 30, // INT → number
created_at: "2024-01-01T00:00:00.000Z" // TIMESTAMP → ISO 8601 Format string
}
]커스텀 변환도 가능합니다:
const rows = await fetchAll(result, auth, {
format: 'JSON_OBJECT',
encodeBigInt: (v: bigint) => Number(v), // bigint → number
encodeTimestamp: (v: string) => new Date(v), // string → Date
})5. mergeExternalLinks: 단일 URL로 병합 + 압축
External Links를 S3 등에 업로드하고 단일 presigned URL을 반환합니다. 여기서 핵심은 병합 과정에서 압축을 적용할 수 있다는 점입니다.
Databricks External Links는 압축을 지원하지 않지만, 이 라이브러리를 사용하면 병합과 동시에 gzip 압축을 적용할 수 있습니다. 특히 CSV나 JSON 같은 텍스트 기반 포맷은 압축률이 매우 높아서 파일 크기를 1/10 이하로 줄일 수 있습니다:
const result = await executeStatement(
'SELECT * FROM large_table LIMIT 100000',
auth,
{ disposition: 'EXTERNAL_LINKS', format: 'CSV' }
)
const merged = await mergeExternalLinks(result, auth, {
mergeStreamToExternalLink: async (stream) => {
// S3에 gzip 압축 업로드
const key = `merged-${Date.now()}.csv.gz`
const gzip = createGzip()
const passThrough = new PassThrough()
const upload = new Upload({
client: s3,
params: {
Bucket: bucket,
Key: key,
Body: passThrough,
ContentType: 'text/csv',
ContentEncoding: 'gzip',
},
})
await Promise.all([
pipeline(stream, gzip, passThrough),
upload.done(),
])
// presigned URL 생성
const externalLink = await getSignedUrl(
s3,
new GetObjectCommand({ Bucket: bucket, Key: key }),
{ expiresIn: 3600 }
)
const head = await s3.send(new HeadObjectCommand({ Bucket: bucket, Key: key }))
return {
externalLink,
byte_count: head.ContentLength ?? 0,
expiration: new Date(Date.now() + 3600000).toISOString(),
}
},
})
// 이제 하나의 압축된 URL만 반환
console.log(merged.result.external_links[0].external_link)
// https://my-bucket.s3.amazonaws.com/merged-...csv.gz?X-Amz-...
// 실제 효과 예시:
// Databricks External Links: 90개 청크 × 20MB = 1.8GB (압축 안됨)
// 병합 + gzip: 단일 파일 180MB (약 90% 절감)6. Partial External Links 자동 처리
Databricks가 일부 청크 메타데이터만 반환하는 경우가 있습니다:
{
"external_links": [
{ "external_link": "https://...", "chunk_index": 0 },
null, // chunk_index 1은 누락
null, // chunk_index 2는 누락
// ...
]
}라이브러리가 자동으로 누락된 청크를 getStatementResultChunks API로 가져옵니다. 사용자는 신경 쓸 필요가 없습니다.
간단한 사용 예
import { executeStatement, fetchAll } from '@bitofsky/databricks-sql'
const auth = {
token: process.env.DATABRICKS_TOKEN!,
host: process.env.DATABRICKS_HOST!, // e.g., abc.cloud.databricks.com
httpPath: process.env.DATABRICKS_HTTP_PATH!, // e.g., /sql/1.0/warehouses/...
}
// 쿼리 실행
const result = await executeStatement('SELECT * FROM my_table LIMIT 10', auth)
// 결과 가져오기
const rows = await fetchAll(result, auth, { format: 'JSON_OBJECT' })
console.log(rows)실제 적용 사례
1. MCP Server 통합
class DatabricksMCPServer {
async executeQuery(query: string): Promise<QueryResponse> {
// 쿼리 실행
const result = await executeStatement(query, this.auth, {
disposition: 'EXTERNAL_LINKS',
format: 'JSON_ARRAY',
enableMetrics: true,
onProgress: (result, metrics) => {
this.sendProgressUpdate({
state: result.status.state,
rowsProduced: metrics?.rows_produced_count,
})
},
})
// 결과 크기에 따라 처리 방식 결정
const rowCount = result.result?.row_count ?? 0
if (rowCount < 1000) {
// 작은 결과: 직접 반환
const rows = await fetchAll(result, this.auth, {
format: 'JSON_OBJECT',
})
return { type: 'inline', data: rows }
} else {
// 큰 결과: S3에 업로드 후 URL 반환
const merged = await mergeExternalLinks(result, this.auth, {
mergeStreamToExternalLink: async (stream) => {
return await this.uploadToS3(stream)
},
})
return {
type: 'external',
url: merged.result.external_links[0].external_link,
rowCount,
}
}
}
}MCP 클라이언트는 이제:
- 작은 결과는 즉시 받아서 사용
- 큰 결과는 하나의 URL로 다운로드
2. 월간 리포트 생성 (압축으로 이메일 첨부 용량 절감)
async function generateMonthlyReport(year: number, month: number) {
const result = await executeStatement(
`
SELECT
date_trunc('day', order_date) as day,
product_category,
SUM(revenue) as total_revenue
FROM sales
WHERE YEAR(order_date) = :year AND MONTH(order_date) = :month
GROUP BY day, product_category
ORDER BY day, product_category
`,
auth,
{
disposition: 'EXTERNAL_LINKS',
format: 'CSV',
parameters: [
{ name: 'year', value: String(year), type: 'INT' },
{ name: 'month', value: String(month), type: 'INT' },
],
}
)
// CSV 압축 및 저장
// Databricks는 압축을 지원하지 않지만, 병합 과정에서 gzip 적용
const filename = `/tmp/report-${year}-${month}.csv.gz`
const stream = fetchStream(result, auth)
await pipeline(stream, createGzip(), createWriteStream(filename))
// 압축 효과: 200MB → 20MB (이메일 첨부 제한 해결)
const stats = await fs.stat(filename)
console.log(`Report size: ${(stats.size / 1024 / 1024).toFixed(2)} MB (compressed)`)
// 이메일 전송
await sendEmail({
to: 'team@example.com',
subject: `Monthly Report ${year}-${month}`,
attachments: [{ filename, path: filename }],
})
}개발 과정에서 배운 것들
1. Arrow IPC 스트림의 복잡성
Apache Arrow는 강력하지만 학습 곡선이 있습니다. RecordBatch vs Table의 차이, IPC 스트림 포맷, EOS 마커 등을 이해하는 데 시간이 걸렸습니다만, AI 도구의 도움으로 빠르게 학습해 적용할 수 있었습니다.
2. 스트림 파이프라인 디버깅
Node.js 스트림은 강력하지만 디버깅이 어렵습니다. 특히:
- 메모리 누수 추적
- 백프레셔 처리
- 에러 전파 보장
이를 해결하기 위해 통합 테스트를 많이 작성했습니다.
3. Databricks API의 비문서화된 동작
일부 API 동작이 문서화되지 않았습니다. 예를 들어:
- Query History API의 메트릭 인자
- 청크 갯수, 크기 결정 로직
결론
공식 SDK의 제약으로 시작된 프로젝트가 두 개의 오픈소스 라이브러리로 완성되었습니다.
@bitofsky/merge-streams는:
- CSV, JSON, Arrow 포맷의 청크를 올바르게 병합
- 메모리 효율적인 스트리밍
- Pre-connection 최적화
- 범용적으로 사용 가능
@bitofsky/databricks-sql는:
- Databricks API 직접 호출
- 최적화된 폴링 메커니즘
- 다양한 결과 소비 방식
- External Links 완전 지원
- Query Metrics 통합
비슷한 문제를 겪고 있다면, 이 라이브러리들이 도움이 되길 바랍니다.
아직 Production Ready를 보장하지는 않습니다. Production 환경에서 사용하려면 먼저 충분한 테스트를 권장합니다.
'개발 > NodeJS' 카테고리의 다른 글
| Typescript compilerHost를 사용해 컴파일 할 때 주의사항 (0) | 2024.03.20 |
|---|---|
| pnpm 심볼릭 링크 버그 수정하기 (0) | 2024.02.26 |