博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
logstash nested内嵌字段 field protobuf解码 codec 的解决办法
阅读量:5223 次
发布时间:2019-06-14

本文共 6442 字,大约阅读时间需要 21 分钟。

logstash nested内嵌字段 field protobuf解码 codec 的解决办法

主要需求

logstash-codec 下

https://www.elastic.co/guide/en/logstash/6.3/codec-plugins.html
此类解码器

只能应用在原始数据上

比如 https://www.elastic.co/guide/en/logstash/6.3/plugins-codecs-protobuf.html
kafka

{ zk_connect => "127.0.0.1" topic_id => "your_topic_goes_here" codec => protobuf {   class_name => "Animal::Unicorn"   include_path => ['/path/to/protobuf/definitions/UnicornProtobuf.pb.rb'] }}

 

对从kafka获取的原始数据进行解码

无法对应原始数据内部的某个字段解码

现在有一个应用场景 内嵌的某个字段是

encode_str= base64_encode(protobuf_encode())

{    "name":"cclient",    "encode_str":"....."}

 

codec-plugins 只能应用在完整数据上,而无法应用在encode_str字段上

官方提供一个相关功能的filter
https://www.elastic.co/guide/en/logstash/6.3/plugins-filters-json.html

对内嵌的json字段串 进行json解析

首选的办法

1 自已实现相关插件,完成特定解码工作,实际工作量不大,因为logstash的官方插件都开源

https://github.com/logstash-plugins/logstash-codec-protobuf

https://github.com/logstash-plugins/logstash-filter-json
以我的需求为例,结合logstash-codec-protobuf 和 logstash-filter-json
就能实现一套对内嵌protobuf字段的解码工具
这也是首先想到的办法,但没有执行,主要考虑到,这种场景有一定的通用性
这里是protobuf,如果换成avro,又需要另写一个插件
所以这个方案先放一放(放了一个晚上,同事就已经实现,一共也只花了几小时)

2 这个方案初期只是猜想,并未验证,思路是,既然官方已经提供了许多的codec,同时提供了ruby filter https://www.elastic.co/guide/en/logstash/6.3/plugins-filters-ruby.html,可否通过ruby filter,调用相应的codec来实现相应的解码功能?

由于同事已经实现了方案一,我主要就把精力放在方案2的验证上,先放结果

*可行。

毕竟不是资深的ruby开发,踩了一些坑,多花了些时间

验证步骤

1 首先是验证ruby是否可引用包

网上有很多引用包的资料,但主要是官方包

首先验证json解析

def json_decode(bin)  return  LogStash::Json.load(bin)end

 

* 官方包可行。

* 第三方未知。

2 尝试引用Protobuf的包,并看可否解码

这里参考了logstash-codec-protobuf的源码
实现
https://github.com/logstash-plugins/logstash-codec-protobuf/blob/master/lib/logstash/codecs/protobuf.rb

 

def register    @metainfo_messageclasses = {}    @metainfo_enumclasses = {}    @metainfo_pb2_enumlist = []    include_path.each { |path| load_protobuf_definition(path) }    if @protobuf_version == 3         @pb_builder = Google::Protobuf::DescriptorPool.generated_pool.lookup(class_name).msgclass    else      @pb_builder = pb2_create_instance(class_name)    endenddef decode(data)    if @protobuf_version == 3      decoded = @pb_builder.decode(data.to_s)      h = pb3_deep_to_hash(decoded)    else      decoded = @pb_builder.parse(data.to_s)      h = decoded.to_hash            end    yield LogStash::Event.new(h) if block_given?  rescue => e    @logger.warn("Couldn't decode protobuf: #{e.inspect}.")    if stop_on_error      raise e    endend # def decode

 

测试用例
https://github.com/logstash-plugins/logstash-codec-protobuf/blob/master/spec/codecs/protobuf_spec.rb

let(:plugin_unicorn) { LogStash::Codecs::Protobuf.new("class_name" => "Animal::Unicorn", "include_path" => [pb_include_path + '/pb2/unicorn.pb.rb'])  }    before do        plugin_unicorn.register          end    it "should return an event from protobuf encoded data" do          data = {:colour => 'rainbow', :horn_length => 18, :last_seen => 1420081471, :has_wings => true}      unicorn = Animal::Unicorn.new(data)              plugin_unicorn.decode(unicorn.serialize_to_string) do |event|        expect(event.get("colour") ).to eq(data[:colour] )        expect(event.get("horn_length") ).to eq(data[:horn_length] )        expect(event.get("last_seen") ).to eq(data[:last_seen] )        expect(event.get("has_wings") ).to eq(data[:has_wings] )      end    end # it

 

看到这里就觉得方案大概率能行的通

 

尝试引用第三方包

require "logstash/codecs/protobuf"plugin_unicorn = LogStash::Codecs::Protobuf.new("class_name" => "Tutorial::Person", "include_path" => ["/usr/share/logstash/protobuf.pb.rb","/usr/share/logstash/addressbook.pb.rb"],"protobuf_version" => 2)plugin_unicorn.registerdef base64_decode(bin)  return Base64.decode64(bin)enddef protobuf_decode(bin)  return protobuf_event=plugin_unicorn.decode(bin)end

 

启动成功

plugin_unicorn.register 是方法调用

*第三方包引用正常(前提是安装了这个包,例require "logstash/codecs/protobuf" 前提是 logstash-plugin install logstash-codec-protobuf)

但执行时报错,没有plugin_unicorn 空引用,猜测是作用域问题

查了下ruby的资料,plugin_unicorn 改为$plugin_unicorn(表示全局作用域),后没有空引用错误

require "logstash/codecs/protobuf"$plugin_unicorn = LogStash::Codecs::Protobuf.new("class_name" => "Tutorial::Person", "include_path" => ["/usr/share/logstash/protobuf.pb.rb","/usr/share/logstash/addressbook.pb.rb"],"protobuf_version" => 2)$plugin_unicorn.registerdef base64_decode(bin)  return Base64.decode64(bin)enddef protobuf_decode(bin)  return $protobuf_event=plugin_unicorn.decode(bin)end

 

新问题是protobuf_decode返回为null,主要时间都花在这个问题上

首先样例数据是同事给的,确认了数据没有问题,调整验证了protobuf的版本,不行,最后试着自已生成了样例数据,问题依旧

注意力都在decode 方法上

def decode(data)    if @protobuf_version == 3      decoded = @pb_builder.decode(data.to_s)      h = pb3_deep_to_hash(decoded)    else      decoded = @pb_builder.parse(data.to_s)      h = decoded.to_hash            end    yield LogStash::Event.new(h) if block_given?  rescue => e    @logger.warn("Couldn't decode protobuf: #{e.inspect}.")    if stop_on_error      raise e    endend # def decode

 

试了多种情况都不成功,参照decode实现了decode_str

def decode_str(data_str)    @logger.info("cdp source: #{data_str}.")    if @protobuf_version == 3      decoded = @pb_builder.decode(data_str)      h = pb3_deep_to_hash(decoded)      return h          else      decoded = @pb_builder.parse(data_str)      h = decoded.to_hash              @logger.info("cdp decoded: #{h}.")      return h    endend # def decode

 

decode_str 成功解码

就原始需求来说,问题解决了,但说白了这是在原始插件上作定制,仍然是方案1,方案2的验证,还未结束

测试改代码的时候突然注意到了一个小区别

yield 和 return

ruby的yield细节还不清楚,但任python和nodejs的经验,yield和return的行为不一样是一定的,yield不会返回结果,并通常有个类似next的方法,yield应该是结果null的原因

简单了解ruby yield的相关资料

def protobuf_decode(bin)  return $protobuf_event=plugin_unicorn.decode(bin)end

 

改为

def protobuf_decode(bin)  $plugin_unicorn.decode(bin) do |event|    return event.to_hash  endend

 

方案二可行,完美

回头再看,yield的坑,完全可以避免

官方源码里的测试用例,有明显的do |event|,因为不熟悉ruby 直觉的写成了return,导致多花了很多时间排查

plugin_unicorn.decode(unicorn.serialize_to_string) do |event|        expect(event.get("colour") ).to eq(data[:colour] )        expect(event.get("horn_length") ).to eq(data[:horn_length] )        expect(event.get("last_seen") ).to eq(data[:last_seen] )        expect(event.get("has_wings") ).to eq(data[:has_wings] )      end

 

方案三

方案一是对单独的codec进行插件开发和定制
方案二验证可行,已经完全满足要求,且有一定的通用性,但是虽然不必为每种codec作定制,仍需简单修改相关代码,作插件的初始化(每个插件的参数并不相同)

plugin_unicorn = LogStash::Codecs::Protobuf.new("class_name" => "Tutorial::Person", "include_path" => ["/usr/share/logstash/protobuf.pb.rb","/usr/share/logstash/addressbook.pb.rb"],"protobuf_version" => 2)

 

是否可以开发一个过滤器,通过配置(配置沿用官方插件),完成对所有codes的解析?

这个待有精力再研究

 

转载于:https://www.cnblogs.com/zihunqingxin/p/10078642.html

你可能感兴趣的文章
201571030319 四则运算
查看>>
RestTemplate 调用本地服务 connection refused
查看>>
.NET方向高级开发人员面试时应该事先考虑的问题
查看>>
台达PLC modbus 不支持04功能码
查看>>
python学习笔记--装饰器
查看>>
发布一个JavaScript工具类库jutil,欢迎使用,欢迎补充,欢迎挑错!
查看>>
discuz 常用脚本格式化数据
查看>>
MS CRM 2011 创建基于Fetch的报表 -- 进阶版
查看>>
洛谷P2777
查看>>
PHPStorm2017设置字体与设置浏览器访问
查看>>
SQL查询总结 - wanglei
查看>>
安装cocoa pods时出现Operation not permitted - /usr/bin/xcodeproj的问题
查看>>
makefile中使用变量
查看>>
GIT笔记:将项目发布到码云
查看>>
JavaScript:学习笔记(7)——VAR、LET、CONST三种变量声明的区别
查看>>
JavaScript 鸭子模型
查看>>
PHP典型功能与Laravel5框架开发学习笔记
查看>>
SQL Server 如何查询表定义的列和索引信息
查看>>
项目上传到github上
查看>>
GCD 之线程死锁
查看>>