기존에 구성되어있던 시스템의 불편함
기존에는 사용하고 있던 Logging System은 Elastic Stack을 통해 구성되어있었습니다. 각 node에 daemonset으로 배포되어 있는 filebeat를 통해 STDOUT으로 출력되는 로그를 수집하고, 버퍼 역할을 하는 Kafka에게 로그를 전달하며 ElasticSearch를 통해 집계하여 Kibana로 대시보드를 구성하여 로그를 확인하는 시스템이었습니다.
하지만 elasticsearch에는 큰 단점으로 느꼈던 점은 사전에 index를 꼭 설정해줘야 한다는 점과, timestamp로 매핑되지 않는다는 것이었습니다. 특히, 개발팀에서 수집이 되어야 하는 로그가 다양하고, key를 추가할수도, 삭제할 수도 있는데 이를 애플리케이션마다 index를 다르게 설정 해야 하는 것과 수집된 로그가 실제로 수집된 시간인지 알 수 없기에 큰 불편함이 존재했습니다.
새로운 시스템 구성
그로인해 새롭게 찾아본 시스템 구성은 Grafana Stack의 Loki였습니다. Loki는 [log를 위한 Prometheus] 라는 목표로 설계되었는데, 인덱스 기반이 아닌 label 기반인 (app="nginx", env="prod") 으로 로그를 관리하기에 사전에 매번 index를 매핑해주지 않아도 되었고, 특히 fluent-bit를 사용하여 k8s에 최적화한 로그 수집을 하여 시스템에 부담없이 구성하도록 하였습니다. 아직은 PoC 단계이지만 구축하면서 공부했던 내용을 정리해보려고 합니다.
구성 Architecture
최종적으로 구성된 아키텍처는 위와 같습니다. deamonset의 형태로 모든 node에 배포된 Fluent-bit이 STDOUT으로 출력된 로그들을 수집하여 Loki로 보냅니다. Loki의 컴포넌트 중 distributor가 fluent-bit로부터 log data를 전달받고 검증한 후 ingester에게 전달합니다. ingester는 log 데이터를 chunks 단위로 쪼갠 후 압축하고 일정 기간동안 보관 후 Object Storage로 flush합니다. compactor는 Object Storage에 저장된 압축된 chunks를 최적화하는 역할을 담당합니다.
사용자가 로그를 조회할 시에 grafana를 통해 접속하여 logQL을 사용하여 label기반으로 log를 질의합니다. 이때 query-frontend가 해당 질의를 받아 분할하며, 효율적인 쿼리를 하기 위해 QueryScheduler가 요청을 스케줄링하여 요청 받은 Querier가 전달된 쿼리를 실행하며 Ingester의 메모리나 Object Storage에서 데이터를 조회합니다.
Loki에서 chunks를 수집하는 방법
Fluent Bit은 Pod에서 출력된 STDOUT 로그, 예를 들어 {"level":30,"time":1736924065183, "hostname":"74cb9f99d6-xrrhs"} 과 같은 로그를 수집합니다. 그런 다음, 로그에 추가적인 정보를 결합하여 {"level":30,"time":1736924065183, "hostname":"74cb9f99d6-xrrhs"} 00:01:23 keystone log body 와 같은 형식으로 Loki의 Distributor로 전송합니다.
distributor에서는 수집된 로그를 검증하게 되는데, 이때 특정 해시값을 부여하여 stream으로 나누게 됩니다.
stream 형식
{
"streams": [
{
"stream": {
"label": "value"
},
"values": [
[ "<unix epoch in nanoseconds>", "<log line>" ],
[ "<unix epoch in nanoseconds>", "<log line>" ]
]
}
]
}
stream의 형식은 위와 같습니다. 보시면 같은 label을 기준으로 stream이 구분되는 것을 확인하실 수 있는데요.
stream 예시
{job="syslog", instance="host1"} 00:00:00 i'm a syslog!
{job="syslog", instance="host1"} 00:00:02 i'm a syslog!
{job="syslog", instance="host2"} 00:00:01 i'm a syslog! <- Accepted, this is a new stream!
{job="syslog", instance="host1"} 00:00:03 i'm a syslog! <- Accepted, still in order for stream 1
{job="syslog", instance="host2"} 00:00:02 i'm a syslog! <- Accepted, still in order for stream 2
{job="syslog", instance="host1"} 00:00:00 i'm a syslog! 로그와 {job="syslog", instance="host1"} 00:00:02 i'm a syslog! 로그는 label이 같기 때문에 같은 stream 1으로 생성됩니다. 하지만 {job="syslog", instance="host2"} 00:00:01 i'm a syslog! 은 label중 instance의 값이 다르기 때문에 새로운 stream 2로 생성되었고, 후에 {job="syslog", instance="host1"} 00:00:03 i'm a syslog! 은 stream 1, {job="syslog", instance="host2"} 00:00:02 i'm a syslog! 은 stream 2로 생성됩니다.
즉, label을 기준으로 stream이 생성되고 같은 label이 없다면 다른 stream으로 생성됩니다. 그러나 label의 수가 너무 많거나 값의 종류가 너무 다양하다면 성능에 부정적인 영향을 미칠 수 있습니다. 이를 '카디널리티'가 높다고 하는데, label을 다양하게 설정하면 검색에는 빠르겠지만 리소스에는 상당한 부하가 있을 수 있습니다. (이때 structured metadata 설정을 통해 해결할 수 있는데, 이 방법은 다음에 한번 정리해보록 하겠습니다...!)
그렇게 distributor는 stream마다 고유의 hash를 부여한 후 chunks로 저장하기 위해 ingester에 전달합니다.
Loki가 Object Storage에 Flush 하는 방법
Loki는 저장 공간을 효율적으로 사용하기 위해 snappy와 같은 압축 알고리즘을 사용하여 데이터 압축을 진행합니다. 이때, chunk가 메모리에서 ObjectStorage로 이동할 때 chunk가 압축되어 저장하고, 다시 쿼리할 때 압축 해제 후 사용합니다.
Chunk-Block 구조 처리 흐름
chunk는 설정해놓은 4가지 시점에 도달하면 Object Storage에 flush가 됩니다.
- chunk_target_size 에 도달
- 최근 flush된 후 chunk_idle_period 도달
- chunks 생성 후 chunk_max_age 도달
- 강제 flush
하지만, 위의 조건들과 일치한다고 해서 즉시 flush 되는 것이 아니라 flush 대기열에 쌓이고 순서대로 flush 됩니다. 위 작업은 Goroutine을 통해 동작하는데 코드를 한번 살펴보겠습니다.
Chunk의 Flush 조건 체크 및 Flush Queue 추가 - sweepUsers, sweepInstance, shouldFlushChunk
위의 함수는 메모리의 모든 stream을 관찰하고 각 stream의 chunks가 flush 조건과 일치하는지 확인 후 일부 chunk가 일치하면 flushing을 위해 queue에 넣는 함수입니다. shouldFlushChunk 함수는 특정 청크가 플러시(Flush) 조건을 만족하는지를 확인하는 함수이고, sweepUsers 함수는 Ingester를 순회하며 sweepInstance를 호출합니다.
그 후 모든 로그 스트림을 대상으로 sweepStream을 실행하고 sweepStream 함수에서 스트림의 마지막 청크를 검사하여 플러시 조건(shouldFlushChunk)을 만족하는지 확인하여 플러시 조건을 만족하면 해당 청크를 플러시 작업 대기열(flushQueues)에 추가합니다. 이때, 플러시 조건을 만족하면 해당 청크를 플러시 작업 대기열(flushQueues)에 추가합니다. (immediate 플래그는 조건을 무시하고 즉각적으로 플러시를 수행할지 여부를 결정합니다.)
Flush Queue 처리 - flushLoop, flushOp
그리고 flushLoop와 flushOp 함수는 flush queue을 관찰하며 대기열에서 Object Storage에 flush합니다. 코드를 살펴보면 i.flushQueues[j].Dequeue()를 호출하여 대기열에서 작업(flushOp)을 꺼내고, 대기열에 작업이 없으면 nil을 반환하고 루프를 종료합니다. 이때, 대기열에거 가져온 작업(flushOp)이 있다면 해당 작업을 i.flushOp(m, op)를 호출하여 처리하고, flush 작업이 실패한 경우(err != nil), 작업을 다시 대기열에 추가(i.flushQueues[j].Enqueue(op))하여 재시도하게 됩니다.
마치며
이번 포스팅에서는 Loki 기반 로깅 및 모니터링 시스템에서 Chunks 생성 및 Flush 동작 방식에 대해 정리해보았습니다. 아직 PoC 단계이지만 구성하면서 공부한 내용들과 실제로 적용한 내용들을 포스팅으로 이어나갈 생각입니다! (Structured Metadata, BloomFilter, retention 등등...)
이 글이 Loki를 활용한 로깅 및 모니터링 시스템 구축을 고민하는 분들께 유익한 정보가 되었기를 바랍니다. 궁금한 사항이나 논의하고 싶은 주제가 있다면 언제든지 댓글로 남겨주세요!
참고
'데브옵스 이야기' 카테고리의 다른 글
자고 일어나니 Argo Project Member? (19) | 2024.11.22 |
---|---|
명심 (3) | 2024.10.06 |
AWS S3 Bucket ACL은 권장사항이 아닙니더! (0) | 2024.08.20 |
[오픈소스 기여하기] 첫번째 기여 - ArgoCD (Implement graceful shutdown in application-controller) (1) | 2024.08.12 |