Logstash를 사용해 데이터를 분할하여 여러 출력으로 전송하기
Logstash는 서버 쪽 데이터 프로세싱 오픈 소스 파이프라인으로, 데이터를 수집하여 변환한 다음, 하나 이상의 출력으로 전송합니다. 이 블로그에서는, Logstash를 사용해 여러 주식 시장으로부터 데이터를 수집해서 고유한 주식 시장 각각에 상응하는 데이터를 고유한 출력으로 전송하는 방법을 보여드리는 예제를 다루려고 합니다. 이를 위해서는 다음 단계를 실행하세요.
- 주식 시장 입력 스트림으로부터 각 문서의 사본을 생성합니다.
- 각 사본을 필터링하여 해당 주식 시장에 유효한 필드만 포함하도록 합니다.
- 각 사본에 메타데이터를 추가하여 어느 주식 시장의 데이터를 포함하고 있는지 표시합니다.
- 각 문서의 메타데이터를 평가하여 문서가 정확한 출력을 향하도록 합니다.
이 블로그 포스팅에서는, 파이프라인-투-파이프라인 통신(6.5에서 베타로 제공)을 사용하지 않습니다. 이것도 여기서 설명하는 기능의 일부를 수행할 가능성이 높습니다.
예제 입력 파일
Logstash로의 입력으로, 주식 시장 벤치마크 값을 포함하는 CSV 파일을 사용합니다. 몇 가지 예제 CSV 입력은 다음과 같습니다.
1483230600,1628.75,1678.1,1772.8,2443.6 1483232400,1613.63,1688.5,1750.5,2460.2 1483234200,1606.51,1678.6,1718,2448.2 1483236000,1621.04,1684.1,1708.1,2470.4
쉼표로 분리된 값은 “시간"과 증권거래소 벤치마크 값 “DAX”, “SMI”, “CAC”, “FTSE”를 나타냅니다. 이것을 예제 Logstash 파이프라인에서 입력으로 사용하기 위해 위의 줄을 “stocks.csv”라고 하는 CSV 파일로 복사해서 붙여넣습니다.
예제 Logstash 파이프라인
아래에서 다음을 수행하는 Logstash 파이프라인을 보여드리겠습니다.
- CSV 파일로부터 CSV 형식의 입력으로 주식 시장 값을 읽습니다.
- CSV 입력의 각 줄을 JSON 문서에 매핑하고, 거기서 CSV 열은 “시간”, “DAX”, “SMI”, “CAC”, “FTSE” 등의 JSON 필드에 매핑합니다.
- 시간 필드를 Unix 형식으로 변환합니다.
- 복제 필터 플러그인을 사용해 각 문서의 복사본 2개를 생성합니다(이 복사본은 원본 문서 이외의 것입니다). 복제 필터는 자동으로 새로운 문서 사본 각각에 새로운 “유형" 필드를 추가하며, 그 “유형"은 복제 배열에서 부여하는 이름에 상응합니다. 우리는 유형이 “clone_for_SMI” 또는 “clone_for_FTSE”가 되도록 정의했으며, 각 복제본은 궁극적으로는 “SMI” 또는 “FTSE” 주식 시장을 위한 데이터만 포함하게 됩니다.
- 각 복제본에 대해,
- 정리 필터 플러그인을 사용해 특정 주식 시장의 허용 목록에 포함된 필드를 제외한 모든 필드를 제거합니다.
- 복제 기능으로 추가된 “유형"에 상응하는 각 문서에 메타데이터를 추가합니다. 이것은 우리가 복제 기능으로 삽입된 “유형"을 제거하는 정리 기능을 사용하고 있기 때문에 필요합니다. 이 정보는 문서가 정확한 출력으로 향하도록 하기 위해 출력 단계에서 반드시 있어야 합니다.
- Logstash용 Elasticsearch 출력 플러그인을 사용해 각 주식 시장에 대한 문서가 다른 Elasticsearch 출력으로 향하도록 작성합니다. 출력은 5단계에서 추가한 메타데이터 필드에서 정의된 값에 의해 결정됩니다. 아래 코드를 간소화하기 위해, 각 Elasticsearch 출력은 로컬 Elasticsearch 클러스터에서 고유한 인덱스를 작성합니다. 여러 클러스터가 출력으로 사용되어야 하는 경우에는, 각 Elasticsearch 출력 선언을 쉽게 수정하여 고유한 Elasticsearch 호스트를 지정할 수 있습니다.
다음은 위의 단계를 수행하는 Logstash 파이프라인입니다(해당 단계 숫자마다 코멘트가 추가되어 있습니다). 수행하려면 이 파이프라인을 "clones.conf"이라고 하는 파일에 복사하세요.
## 1단계 input { file { # stocks.csv 파일을 사용하려면 경로를 편집해야 합니다 path => "${HOME}/stocks.csv" # 다음은 매번 Logstas가 수행할 때마다 # 전체 입력을 다시 읽도록 합니다(디버깅에 유용합니다). start_position => "beginning" sincedb_path => "/dev/null" } } ## 2단계 filter { csv { columns => ["time","DAX","SMI","CAC","FTSE"] separator => "," convert => { 'DAX' => 'float' 'SMI' => 'float' 'CAC' => 'float' 'FTSE' => 'float' } } ## 3단계 date { match => ['time', 'UNIX'] } ## 4단계 # 다음 줄은 각 문서의 사본 2개를 # 추가로 생성합니다(즉, 원본을 # 포함해 총 3개). # 각 사본은 배열에서 부여된 이름에 상응하여 # 추가된 “유형" 필드를 자동으로 갖게 됩니다. clone { clones => ['clone_for_SMI', 'clone_for_FTSE'] } ## 5단계 if [type] == 'clone_for_SMI' { # "SMI"를 제외한 모든 것을 삭제합니다. prune { whitelist_names => [ "SMI"] } mutate { add_field => { "[@metadata][type]" => "only_SMI" } } } else if [type] == 'clone_for_FTSE' { prune { whitelist_names => [ "FTSE"] } mutate { add_field => { "[@metadata][type]" => "only_FTSE" } } } } ## 6단계 output { # stdout으로의 다음 출력은 디버깅만을 위한 것이며 # 삭제될 수 있습니다. stdout { codec => rubydebug { metadata => true } } if [@metadata][type] == 'only_SMI' { elasticsearch { index => "smi_data" } } else if [@metadata][type] == 'only_FTSE' { elasticsearch { index => "ftse_data" } } else { elasticsearch { index => "stocks_original" } } }
Logstash 파이프라인 테스트
예제 CSV 데이터로 이 파이프라인을 테스트하려면, 다음 줄을 수행하여 이를 수정하고 시스템에 대한 정확한 사용 경로를 확보할 수 있습니다. "config.reload.automatic"을 지정하는 것은 선택 사항이지만, Logstash를 다시 시작하지 않고 "clones.conf"를 자동으로 다시 로드할 수 있게 해준다는 점에 유의하세요.
./logstash -f ./clones.conf --config.reload.automatic
일단 Logstash가 "stocks.csv" 파일을 읽고 프로세싱을 완료했으면, 그 결과로 생성되는 "smi_data", "ftse_data", "stocks_original"이라고 하는 3개의 인덱스를 볼 수 있습니다.
SMI 인덱스 확인
GET /smi_data/_search
이것은 다음 구조를 가진 문서를 표시하게 됩니다. "smi_data" 인덱스에는 오직 “SMI” 데이터만 나타나는 것을 눈여겨보세요.
{ "_index": "smi_data", "_type": "doc", "_id": "_QRskWUBsYalOV9y9hGJ", "_score": 1, "_source": { "SMI": 1688.5 } }
FTSE 인덱스 확인
GET /ftse_data/_search
이것은 다음 구조를 가진 문서를 표시하게 됩니다. "ftse_data" 인덱스에는 오직 “FTSE”필드만 문서에 나타나는 것을 눈여겨보세요.
{ "_index": "ftse_data", "_type": "doc", "_id": "AgRskWUBsYalOV9y9hL0", "_score": 1, "_source": { "FTSE": 2448.2 } }
원본 문서 인덱스 확인
GET /stocks_originals/_search
이것은 다음 구조를 가진 문서를 표시하게 됩니다. "stocks_original" 인덱스에는 필터링되지 않은 버전의 문서 원본이 나타나는 것을 눈여겨보세요.
{ "_index": "stocks_original", "_type": "doc", "_id": "-QRskWUBsYalOV9y9hFo", "_score": 1, "_source": { "host": "Alexanders-MBP", "@timestamp": "2017-01-01T00:30:00.000Z", "SMI": 1678.1, "@version": "1", "message": "1483230600,1628.75,1678.1,1772.8,2443.6", "CAC": 1772.8, "DAX": 1628.75, "time": "1483230600", "path": "/Users/arm/Documents/ES6.3/datasets/stocks_for_clones.csv", "FTSE": 2443.6 } }
결론
이 블로그에서는 Logstash 기능의 작은 부분을 시연해 보여드렸습니다. 특히, Logstash를 사용해 여러 주식 시장으로부터 데이터를 수집한 다음, 그 데이터를 처리해 고유한 출력으로 전송하는 방법을 보여주는 예제를 소개했습니다. Logstash와 Elastic Stack을 테스트하면서 질문이 있으시면, 언제든지 저희 공개 토론 포럼에서 도움을 요청해주세요.