Logstash를 사용해 데이터를 분할하여 여러 출력으로 전송하기

Logstash는 서버 쪽 데이터 프로세싱 오픈 소스 파이프라인으로, 데이터를 수집하여 변환한 다음, 하나 이상의 출력으로 전송합니다. 이 블로그에서는, Logstash를 사용해 여러 주식 시장으로부터 데이터를 수집해서 고유한 주식 시장 각각에 상응하는 데이터를 고유한 출력으로 전송하는 방법을 보여드리는 예제를 다루려고 합니다. 이를 위해서는 다음 단계를 실행하세요.

  1. 주식 시장 입력 스트림으로부터 각 문서의 사본을 생성합니다.
  2. 각 사본을 필터링하여 해당 주식 시장에 유효한 필드만 포함하도록 합니다.
  3. 각 사본에 메타데이터를 추가하여 어느 주식 시장의 데이터를 포함하고 있는지 표시합니다.
  4. 각 문서의 메타데이터를 평가하여 문서가 정확한 출력을 향하도록 합니다.

이 블로그 포스팅에서는, 파이프라인-투-파이프라인 통신(6.5에서 베타로 제공)을 사용하지 않습니다. 이것도 여기서 설명하는 기능의 일부를 수행할 가능성이 높습니다.

예제 입력 파일

Logstash로의 입력으로, 주식 시장 벤치마크 값을 포함하는 CSV 파일을 사용합니다. 몇 가지 예제 CSV 입력은 다음과 같습니다.

1483230600,1628.75,1678.1,1772.8,2443.6
1483232400,1613.63,1688.5,1750.5,2460.2
1483234200,1606.51,1678.6,1718,2448.2
1483236000,1621.04,1684.1,1708.1,2470.4

쉼표로 분리된 값은 “시간"과 증권거래소 벤치마크 값 “DAX”, “SMI”, “CAC”, “FTSE”를 나타냅니다. 이것을 예제 Logstash 파이프라인에서 입력으로 사용하기 위해 위의 줄을 “stocks.csv”라고 하는 CSV 파일로 복사해서 붙여넣습니다.

예제 Logstash 파이프라인

아래에서 다음을 수행하는 Logstash 파이프라인을 보여드리겠습니다.

  1. CSV 파일로부터 CSV 형식의 입력으로 주식 시장 값을 읽습니다.
  2. CSV 입력의 각 줄을 JSON 문서에 매핑하고, 거기서 CSV 열은 “시간”, “DAX”, “SMI”, “CAC”, “FTSE” 등의 JSON 필드에 매핑합니다.
  3. 시간 필드를 Unix 형식으로 변환합니다.
  4. 복제 필터 플러그인을 사용해 각 문서의 복사본 2개를 생성합니다(이 복사본은 원본 문서 이외의 것입니다). 복제 필터는 자동으로 새로운 문서 사본 각각에 새로운 “유형" 필드를 추가하며, 그 “유형"은 복제 배열에서 부여하는 이름에 상응합니다. 우리는 유형이 “clone_for_SMI” 또는 “clone_for_FTSE”가 되도록 정의했으며, 각 복제본은 궁극적으로는 “SMI” 또는 “FTSE” 주식 시장을 위한 데이터만 포함하게 됩니다.
  5. 각 복제본에 대해,
    1. 정리 필터 플러그인을 사용해 특정 주식 시장의 허용 목록에 포함된 필드를 제외한 모든 필드를 제거합니다.
    2. 복제 기능으로 추가된 “유형"에 상응하는 각 문서에 메타데이터를 추가합니다. 이것은 우리가 복제 기능으로 삽입된 “유형"을 제거하는 정리 기능을 사용하고 있기 때문에 필요합니다. 이 정보는 문서가 정확한 출력으로 향하도록 하기 위해 출력 단계에서 반드시 있어야 합니다.
  6. Logstash용 Elasticsearch 출력 플러그인을 사용해 각 주식 시장에 대한 문서가 다른 Elasticsearch 출력으로 향하도록 작성합니다. 출력은 5단계에서 추가한 메타데이터 필드에서 정의된 값에 의해 결정됩니다. 아래 코드를 간소화하기 위해, 각 Elasticsearch 출력은 로컬 Elasticsearch 클러스터에서 고유한 인덱스를 작성합니다. 여러 클러스터가 출력으로 사용되어야 하는 경우에는, 각 Elasticsearch 출력 선언을 쉽게 수정하여 고유한 Elasticsearch 호스트를 지정할 수 있습니다.

다음은 위의 단계를 수행하는 Logstash 파이프라인입니다(해당 단계 숫자마다 코멘트가 추가되어 있습니다). 수행하려면 이 파이프라인을 "clones.conf"이라고 하는 파일에 복사하세요.

## 1단계
input {
  file {
    # stocks.csv 파일을 사용하려면 경로를 편집해야 합니다
    path => "${HOME}/stocks.csv"
    # 다음은 매번 Logstas가 수행할 때마다 
    # 전체 입력을 다시 읽도록 합니다(디버깅에 유용합니다).
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}
## 2단계
filter {
   csv {
    columns => ["time","DAX","SMI","CAC","FTSE"]
    separator => ","
    convert => { 
      'DAX' => 'float'
      'SMI' => 'float'
      'CAC' => 'float'
      'FTSE' => 'float'
    }
  }
## 3단계  
date {
    match => ['time', 'UNIX']
  }
## 4단계
  # 다음 줄은 각 문서의 사본 2개를 
  # 추가로 생성합니다(즉, 원본을 
  # 포함해 총 3개). 
  # 각 사본은 배열에서 부여된 이름에 상응하여 
  # 추가된 “유형" 필드를 자동으로 갖게 됩니다.
  clone {
    clones => ['clone_for_SMI', 'clone_for_FTSE']
  }
## 5단계
  if [type] == 'clone_for_SMI' {
    # "SMI"를 제외한 모든 것을 삭제합니다.
    prune {
       whitelist_names => [ "SMI"]
    }
    mutate {
      add_field => { "[@metadata][type]" => "only_SMI" } 
    }
  } 
  else if [type] == 'clone_for_FTSE' {
    prune {
       whitelist_names => [ "FTSE"]
    }
    mutate {
      add_field => { "[@metadata][type]" => "only_FTSE" } 
    }
  } 
}
## 6단계
output {
  # stdout으로의 다음 출력은 디버깅만을 위한 것이며 
  # 삭제될 수 있습니다.
  stdout { 
    codec =>  rubydebug {
      metadata => true
    }
  }
  if [@metadata][type] == 'only_SMI' {
    elasticsearch {
      index => "smi_data"
    }
  }
  else if [@metadata][type] == 'only_FTSE' {
    elasticsearch {
      index => "ftse_data"
    }
  }
  else {
    elasticsearch {
      index => "stocks_original"
    }
  }
}

Logstash 파이프라인 테스트

예제 CSV 데이터로 이 파이프라인을 테스트하려면, 다음 줄을 수행하여 이를 수정하고 시스템에 대한 정확한 사용 경로를 확보할 수 있습니다. "config.reload.automatic"을 지정하는 것은 선택 사항이지만, Logstash를 다시 시작하지 않고 "clones.conf"를 자동으로 다시 로드할 수 있게 해준다는 점에 유의하세요.

./logstash -f ./clones.conf --config.reload.automatic

일단 Logstash가 "stocks.csv" 파일을 읽고 프로세싱을 완료했으면, 그 결과로 생성되는 "smi_data", "ftse_data", "stocks_original"이라고 하는 3개의 인덱스를 볼 수 있습니다.

SMI 인덱스 확인

GET /smi_data/_search

이것은 다음 구조를 가진 문서를 표시하게 됩니다. "smi_data" 인덱스에는 오직 “SMI” 데이터만 나타나는 것을 눈여겨보세요.

      {
        "_index": "smi_data",
        "_type": "doc",
        "_id": "_QRskWUBsYalOV9y9hGJ",
        "_score": 1,
        "_source": {
          "SMI": 1688.5    
        }
      }

FTSE 인덱스 확인

GET /ftse_data/_search

이것은 다음 구조를 가진 문서를 표시하게 됩니다. "ftse_data" 인덱스에는 오직 “FTSE”필드만 문서에 나타나는 것을 눈여겨보세요.

      {
        "_index": "ftse_data",
        "_type": "doc",
        "_id": "AgRskWUBsYalOV9y9hL0",
        "_score": 1,
        "_source": {
          "FTSE": 2448.2
        }
      }

원본 문서 인덱스 확인

GET /stocks_originals/_search

이것은 다음 구조를 가진 문서를 표시하게 됩니다. "stocks_original" 인덱스에는 필터링되지 않은 버전의 문서 원본이 나타나는 것을 눈여겨보세요.

      {
        "_index": "stocks_original",
        "_type": "doc",
        "_id": "-QRskWUBsYalOV9y9hFo",
        "_score": 1,
        "_source": {
          "host": "Alexanders-MBP",
          "@timestamp": "2017-01-01T00:30:00.000Z",
          "SMI": 1678.1,
          "@version": "1",
          "message": "1483230600,1628.75,1678.1,1772.8,2443.6",
          "CAC": 1772.8,
          "DAX": 1628.75,
          "time": "1483230600",
          "path": "/Users/arm/Documents/ES6.3/datasets/stocks_for_clones.csv",
          "FTSE": 2443.6
        }
      }

결론

이 블로그에서는 Logstash 기능의 작은 부분을 시연해 보여드렸습니다. 특히, Logstash를 사용해 여러 주식 시장으로부터 데이터를 수집한 다음, 그 데이터를 처리해 고유한 출력으로 전송하는 방법을 보여주는 예제를 소개했습니다. Logstash와 Elastic Stack을 테스트하면서 질문이 있으시면, 언제든지 저희 공개 토론 포럼에서 도움을 요청해주세요.