Logstash import data from jdbc

Posted on 2021-08-06 18:00 in Java

一次性导入

logstash 是基于jruby的,也可以使用jdbc,从数据库中导入数据也很方便。

下面是个例子,而且加入了IP地址到地理位置的转换:

input {
 jdbc {
       jdbc_validate_connection => true
       jdbc_connection_string => "jdbc:oracle:thin:@192.168.100.1:1521:orcl"
       jdbc_user => "read"
       jdbc_password => "readonly"
       jdbc_driver_library => "D:\ojdbc6.jar"
       jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
       jdbc_paging_enabled => true
       statement => "select t.log_id, t.log_time, t.client_ip, t.user_agent, t.browser, t.browser_type, t.browser_major_version, t.device_type, t.platform, t.platform_version from sys_log t 
       where EXTRACT(year from t.log_time) > 2020"
       lowercase_column_names => false
   }
}

filter {
  mutate {
    rename => { "LOG_ID" => "logId"}
    rename => { "LOG_TIME" => "@timestamp"}
    rename => { "CLIENT_IP" => "ip"}
    rename => { "USER_AGENT" => "ua"}
    rename => { "BROWSER" => "browser"}
    rename => { "BROWSER_TYPE" => "browserType"}
    rename => { "BROWSER_MAJOR_VERSION" => "browserMajorVersion"}
      rename => { "DEVICE_TYPE" => "deviceType"}
      rename => { "PLATFORM" => "platform"}
      rename => { "PLATFORM_VERSION" => "platformVersion"}
  }
  geoip {
    source => "ip"
    database => "D:\logstash-7.3.1\GeoLite2-City_20210804\GeoLite2-City.mmdb"
  }
}

output {
  stdout { codec => rubydebug }

if "_jsonparsefailure" not in [tags] {
  elasticsearch {
    hosts => ["192.168.100.2:9200"]
    index => "sys-log-2021"
    }
  }
}

持续导入

如果日志是持续性导入,可以 track 上次导入的进度,下次在此基础上做增量导入。

input {
 jdbc {
       jdbc_validate_connection => true
       jdbc_connection_string => "jdbc:oracle:thin:@192.168.100.1:1521:orcl"
       jdbc_user => "read"
       jdbc_password => "readonly"
       jdbc_driver_library => "D:\ojdbc6.jar"
       jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
       jdbc_paging_enabled => true
       statement => "select t.log_id, t.log_time, t.client_ip, t.user_agent, t.browser, t.browser_type, t.browser_major_version, t.device_type, t.platform, t.platform_version from sys_log t 
       where EXTRACT(year from t.log_time) > 2020 and t.log_id > :sql_last_value order by t.log_id asc"
       lowercase_column_names => false

       # tracking
       parameters => { "sql_last_value" => 0 }
       use_column_value => true
       tracking_column => "LOG_ID"
       tracking_column_type => "numeric"
       # 每分钟执行一次
       schedule => "0 * * * * *"
   }
}

filter {
  mutate {
    rename => { "LOG_ID" => "logId"}
    rename => { "LOG_TIME" => "@timestamp"}
    rename => { "CLIENT_IP" => "ip"}
    rename => { "USER_AGENT" => "ua"}
    rename => { "BROWSER" => "browser"}
    rename => { "BROWSER_TYPE" => "browserType"}
    rename => { "BROWSER_MAJOR_VERSION" => "browserMajorVersion"}
      rename => { "DEVICE_TYPE" => "deviceType"}
      rename => { "PLATFORM" => "platform"}
      rename => { "PLATFORM_VERSION" => "platformVersion"}
  }
  geoip {
    source => "ip"
    database => "D:\logstash-7.3.1\GeoLite2-City_20210804\GeoLite2-City.mmdb"
  }
}

output {
  stdout { codec => rubydebug }

if "_jsonparsefailure" not in [tags] {
  elasticsearch {
    hosts => ["192.168.100.2:9200"]
    index => "sys-log-2021"
    }
  }
}

记录上次 sql_last_value 的临时文件是 last_run_metadata_path,默认值是 "$HOME/.logstash_jdbc_last_run"