Skip to content

Latest commit

 

History

History
188 lines (139 loc) · 6.35 KB

009.Pipeline-to-Pipeline_Communication.md

File metadata and controls

188 lines (139 loc) · 6.35 KB

009. Pipeline-to-Pipeline Communication

00. 실습환경

  • Windows 10
  • Elasticsearch 7.10.2
  • Logstash 7.10.2
  • Kibana 7.10.2
  • Filebeat 7.10.2
  • Git Bash

Elasticsearch와 Kibana가 실행된 상태로 아래 실습을 진행해주세요.

01. 실습 파일 준비하기

실습을 진행하기 위해 임의로 생성한 로그 파일을 활용하도록 하겠습니다. 로그 파일은 아래 세 가지 파일을 사용할 예정이며, 로그 내용은 임의로 구성하였습니다.

각 로그 파일을 처리하여 Elasticsearch에 저장하는 Logstash 파일도 준비하였습니다. 이 파일을 사용하여 Pipeline-to-Pipeline Communication 실습을 진행할 예정입니다.

그 다음 데이터를 Logstash로 전달하기 위한 Filebeat 설정을 하겠습니다. access.log, error.log, transaction.log 각각에 대해 설정을 추가하며, muliline을 설정하여 각 로그를 날짜를 기준으로 묶어 보낼 수 있게 설정했습니다.

multiline_defaults: &multiline_defaults
  multiline:
    pattern: '^\d{4}-\d{2}-\d{2}'
    negate: true
    match: after
filebeat.inputs:
- type: log
  enabled: true
  paths:
     - E:/personal/Workspace/TIL/practice/elastic-stack-posting/post009/access.log
  multiline:
    pattern: '^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$|^(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}$'
    negate: false
    match: after
  fields:
    log_type: access

- type: log
  enabled: true
  paths:
     - E:/personal/Workspace/TIL/practice/elastic-stack-posting/post009/error.log
  <<: *multiline_defaults
  fields:
    log_type: error

- type: log
  enabled: true
  paths:
     - E:/personal/Workspace/TIL/practice/elastic-stack-posting/post009/transaction.log
  <<: *multiline_defaults
  fields:
    log_type: transaction

output.logstash:
  hosts: ["localhost:5044"]

이제 실습을 위한 모든 준비가 끝났습니다!

02. Pipeline 설정하기

먼저 Pipeline을 설정하며 logstash.conf 파일을 어떻게 분리할지 정해보겠습니다.

현재 logstash.conf 파일을 Filebeat를 통해 파일을 읽어오고, Elasticsearch로 파일을 내보냅니다. 읽어온 파일은 log_type을 기준으로 각각 다르게 처리되며, log_typetransaction이면서 로그 메시지가 REQUEST로 시작하는 경우에 대해서는 또 다른 방식으로 데이터를 처리합니다. 그리고 모든 데이터는 Elasticsearch로 내보내기 전, 로그에 찍힌 날짜를 data 플러그인을 통해 기본 날짜/시간 포맷으로 변경합니다.

전체 흐름을 정리하면 다음과 같습니다.

image

이 흐름을 바탕으로 pipline.yml에 필요한 Config 파일을 작성하겠습니다.

- pipeline.id: inputstream
  path.config: "../config/pipelines/inputstream.conf"

- pipeline.id: accesspipe
  path.config: "../config/pipelines/accesspipe.conf"

- pipeline.id: errorpipe
  path.config: "../config/pipelines/errorpipe.conf"

- pipeline.id: transactionpipe
  path.config: "../config/pipelines/transactionpipe.conf"

- pipeline.id: requestpipe
  path.config: "../config/pipelines/requestpipe.conf"

- pipeline.id: datepipe
  path.config: "../config/pipelines/datepipe.conf"

- pipeline.id: outputstream
  path.config: "../config/pipelines/outputstream.conf"

03. Config 파일 분리하기

이제 흐름에 맞게 logstash.conf 파일을 분리하도록 하겠습니다. 이 때, outputstream을 제외하고 나머지 Config는 다음 Pipeline으로 데이터를 전달하는 구조로 작성해야 합니다. Pipeline을 사용할 때는, sent_to를 사용하여 다음 Pipeline으로 데이터를 전달하고 address를 사용하여 데이터를 받아옵니다.

먼저 inputstream.conf를 작성하겠습니다. log_type에 따라 각각 다른 Pipeline으로 보낼 수 있게 if-else문을 사용하였습니다.

input {
  beats {
    port => 5044
  }
}

output {
  if [fields][log_type] == "access" {
    pipeline { send_to => accesslog }
  } else if [fields][log_type] == "error" {
    pipeline { send_to => errorlog }
  } else {
    pipeline { send_to => [transactionlog] }
  }
}

그 다음 accesspipe.conf를 작성하겠습니다. send_to로 보냈던 값을 그대로 address에 넣으면 데이터를 받아옵니다. errorpipe.conftransactionpipe.conf는 이와 유사하게 작성하면 되기에 여기서는 생략하였습니다.

input {
  pipeline { address => accesslog }
}

filter {
  dissect {
    mapping => {
      "message" => "%{ip} - - [%{log_datetime} %{}] %{message}"
    }
  }
}

output {
  pipeline { send_to => [dateplugin] }
}

이런 식으로 각 파일을 분리한 다음, Elasticsearch로 내보내는 outputstream.conf는 다음과 같이 작성해줍니다.

input {
  pipeline { address => outputdata }
}

filter {
  ruby {
    code => "event.set('index_day', event.get('datetime').time.localtime('+09:00').strftime('%Y%m%d'))"
  }
}

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "%{[fields][log_type]}-%{index_day}"
  }
}

04. 실행하기

작성한 내용이 정상적으로 동작하는지 확인하기 위해 Logstash와 Filebeat를 실행합니다. 여기서는 실행 명령어에 대해서 생략하도록 하겠습니다.

그 다음 Kibana 대시보드에 접속하여 DevTools를 열고 아래 명령어를 입력합니다.

GET _cat/indices/*-20230810?v

명령어를 실행하면 아래 이미지와 같이 정상적으로 인덱스가 생성된 것을 확인할 수 있습니다.

image

전체 Config 파일 내용은 Github에서 확인하실 수 있습니다.


참고