博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Avro
阅读量:6847 次
发布时间:2019-06-26

本文共 16898 字,大约阅读时间需要 56 分钟。

hot3.png

由于最近在整理公司的培训事情,首先培训的就是Avro,故这里做一个记录

一、介绍,直接看官网来得快

官方网站:http://avro.apache.org/

1.1、Maven项目构建pom示例

所需要的jar包

 
             
                   
 junit                   
 junit                   
 4.12             
             
             
                   
 org.apache.avro                   
 avro                   
 1.7.7             
             
             
                   
 org.apache.avro                   
 avro-ipc
                   
 1.7.7             

所需要的插件,如果是在外部生成,可以不要

 
    
 org.apache.avro
    
 avro-maven-plugin
    
 1.7.7
    
          
                
 generate-sources                
                      
                      
                      
                      
 protocol                
                
                      
                      
 ${project.basedir}/src/main/resources/
                      
 ${project.basedir}/src/main/java/
                
          
    
 

二、消息结构

Avro的模式主要由JSON对象来表示,Avro支持8种基本类型(Primitive Type)和6种复杂类型(Complex Type:records、enums、arrays、maps、unions 和fixed),基本类型可以由JSON字符串来表示。

Avro支持两种序列化编码方式:二进制编码和JSON编码,使用二进制编码会高效序列化,并且序列化后得到的结果会比较小。

2.1、基本数据类型

null: 表示没有值 0字节

boolean: 表示一个二进制布尔值 一个字节 0-false,1-true

int: 表示32位有符号整数

long: 表示64位有符号整数
float: 表示32位的单精度浮点数(IEEE 754)4字节
double: 表示64位双精度浮点数(IEEE 754) 8字节
bytes: 表示8位的无符号字节序列
string: Unicode 编码的字符序列

总共就这8种原生数据类型,这些原生数据类型均没有明确的属性。

原生数据类型也可以使用JSON定义类型名称,比如schema "string"和{"type": "string"}是同义且相等的。

2.2、复合数据类型

2.2.1、records使用类型名称“record”,并且支持三个必选属性。

type: 必有属性。

name: 必有属性,是一个JSON string,提供了记录的名字。

namespace,也是一个JSON string,用来限定和修饰name属性,动态方式生成后,为Java的包名

doc: 可选属性,是一个JSON string,为使用这个Schema的用户提供文档。

aliases: 可选属性,是JSON的一个string数组,为这条记录提供别名。

fields: 必选属性,是一个JSON数组,数组中列举了所有的field。每一个field都是一个JSON对象,并且具有如下属性:

     name: 必选属性,field的名字,是一个JSON string,类似一个Java属性。

     doc: 可选属性,为使用此Schema的用户提供了描述此field的文档。

     type: 必选属性,定义Schema的一个JSON对象,或者是命名一条记录定义的JSON string。

     default: 可选属性,即field的默认值,当读到缺少这个field的实例时用到。

例如:

{"namespace": "example.avro","type": "record","name": "User","fields": [     {"name": "name", "type": "string"},     {"name": "favorite_number",  "type": ["int", "null"]},     {"name": "favorite_color", "type": ["string", "null"]}]}

说明:以上表示建立了一个复合类型User,具有三个字段属性,分别是字符串类型的name,整型的favorite_number,字符串类型的favorite_color,报名:example.avro,类名为:user

2.2.2、“enum”的type并且支持如下的属性:

name: 必有属性,是一个JSON string,提供了enum的名字。

namespace,也是一个JSON string,用来限定和修饰name属性。
aliases: 可选属性,是JSON的一个string数组,为这个enum提供别名。
doc: 可选属性,是一个JSON string,为使用这个Schema的用户提供文档。
symbols: 必有属性,是一个JSON string数组,列举了所有的symbol,在enum中的所有symbol都必须是唯一的,不允许重复。比如下面的例子:
例如

{ "type": "enum","name": "Suit","symbols" : ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]}

说明:以上表示建立了一个Suit的枚举类,含有symbols里面的属性

2.2.3、Array使用名为"array"的type,并且支持一个属性 items: array中元素的Schema

例如

{  "type": "record",  "name": "ArrAvro",  "fields" : [    {"name": "arr", "type": ["null",{"type":"array", "items":"string"}]}  ]}

2.2.4、Map使用名为"map"的type,并且支持一个属性 values: 用来定义map的值的Schema。Maps的key都是string。

比如一个key为string,value为long的maps定义为:

例如

{
  "type": "record",
  "name": "MapAvro",
  "fields" : [
    {"name": "map", "type": ["null",{"type":"map", "values":"string"}]}
  ]

}

2.2.5、序列化文件样式及RPC通讯协议样式编写

序列化schema文件

Members.avsc

{       "namespace":"com.ifree.serrpc.builder",       "type":"record",       "name":"Members",       "fields":[            {                   "name":"userName",                   "type":"string"            },            {                   "name":"userPwd",                   "type":"string"            },            {                   "name":"realName",                   "type":"string"            }                  ]      }

说明:该文件中的namespace命名空间在生成代码的时候会自动生成包路径;type类型是record复合类型;name为Members,在生成的时候,会生成一个Members类,建议首字母大写;fields是一个字段集合,里面的每一个字段类似与Java的实体字段,如userName字段,类型为String

RPC通讯协议protocol文件

Members.avpr

{       "namespace":"com.ifree.serrpc.builder",       "protocol":"MemberIFace",       "types":[            {                   "type":"record",                   "name":"Members",                   "fields":[                        {                               "name":"userName",                               "type":"string"                        },                        {                               "name":"userPwd",                               "type":"string"                        },                        {                               "name":"realName",                               "type":[                                     "string",                                     "null"                              ]                                                      }                                          ]                              },            {                   "type":"record",                   "name":"Retmsg",                   "fields":[                        {                               "name":"msg",                               "type":"string"                        }                  ]            }                  ],       "messages":{             "login":{                   "doc":"member login.",                   "request":[                        {                               "name":"m",                               "type":"Members"                        }                                          ],                   "response":"Retmsg"            }                  }}

说明:该文件中,namespace表示包路径;protocol表示协议,名字是MemberIFace,这里在工具生成的时候会生成一个类,故首字母大写;types是一个类型集合,返回类型和请求类型都可以在这里定义,里面的定义方式可参考序列化化定义文件;messages是表示一个请求返回消息体,login表示一个Java方法,里面有两部分组成,请求和响应,request表示请求,里面是一个members类型的对象,response表示相应,返回值,这里是Retmsg对象。

三、序列化编码

     Avro有两种序列化编码:binary和JSON。

四、序列化

      在使用序列化时,我们可以有两种方式来实现,一种是静态方式,采用schema文件来生成所需要的类,然后直接调用类里面的实现;另一种是动态方式,直接采用代码解析schema文件内容,动态设置内容。

五、RPC

      在使用Avro进行RPC通讯时,我们可以有两种方式来实现,一种是静态方式,采用protocol文件来生成所需要的类,然后直接调用类里面的实现;另一种是动态方式,直接采用代码解析protocol文件内容,动态设置内容。

六、综合示例

序列化Schema和RPC的Protocol都是上面的文件,这里不做列出

Java代码:

服务端代码:含(动态|工具)序列化、RPC通讯代码

package com.ifree.serrpc.avro;import java.io.File;import java.io.IOException;import java.net.InetSocketAddress;import java.util.Random;import org.apache.avro.Protocol;import org.apache.avro.Protocol.Message;import org.apache.avro.Schema;import org.apache.avro.Schema.Parser;import org.apache.avro.file.DataFileWriter;import org.apache.avro.generic.GenericData;import org.apache.avro.generic.GenericRecord;import org.apache.avro.io.DatumWriter;import org.apache.avro.ipc.HttpServer;import org.apache.avro.ipc.NettyServer;import org.apache.avro.ipc.Server;import org.apache.avro.ipc.generic.GenericResponder;import org.apache.avro.ipc.specific.SpecificResponder;import org.apache.avro.specific.SpecificDatumWriter;import org.junit.Test;import com.ifree.serrpc.builder.MemberIFace;import com.ifree.serrpc.builder.Members;import com.ifree.serrpc.impl.MemberIFaceImpl;/** * 会员信息处理服务端 *  * @author ifree * */public class MemberServerProvider {	/**	 * 动态序列化:通过动态解析Schema文件进行内容设置,并序列化内容	 * 	 * @throws IOException	 */	@Test	public void MemberInfoDynSer() throws IOException {		// 1.解析schema文件内容		Parser parser = new Parser();		Schema mSchema = parser.parse(this.getClass().getResourceAsStream("/Members.avsc"));		// 2.构建数据写对象		DatumWriter
 mGr = new SpecificDatumWriter
(mSchema); DataFileWriter
 mDfw = new DataFileWriter
(mGr); // 3.创建序列化文件 mDfw.create(mSchema, new File("E:/avro/members.avro")); // 4.添加序列化数据 for (int i = 0; i < 20; i++) { GenericRecord gr = new GenericData.Record(mSchema); int r = i * new Random().nextInt(50); gr.put("userName", "xiaoming-" + r); gr.put("userPwd", "9999" + r); gr.put("realName", "小明" + r + "号"); mDfw.append(gr); } // 5.关闭数据文件写对象 mDfw.close(); System.out.println("Dyn Builder Ser Start Complete."); } /**  * 通过Java工具生成文件方式进行序列化操作 命令:C:\Users\Administrator>java -jar  * E:\avro\avro-tools-1.7.7.jar compile schema E:\avro\Members.avsc E:\avro  *   * @throws IOException  */ @Test public void MemberInfoToolsSer() throws IOException { // 1.为Member生成对象进行设置必要的内容,这里实现三种设置方式的演示 // 1.1、构造方式 Members m1 = new Members("xiaoming", "123456", "校名"); // 1.2、属性设置 Members m2 = new Members(); m2.setUserName("xiaoyi"); m2.setUserPwd("888888"); m2.setRealName("小艺"); // 1.3、Builder方式设置 Members m3 = Members.newBuilder().setUserName("xiaohong").setUserPwd("999999").setRealName("小红").build(); // 2.构建反序列化写对象 DatumWriter
 mDw = new SpecificDatumWriter
(Members.class); DataFileWriter
 mDfw = new DataFileWriter
(mDw); // 2.1.通过对Members.avsc的解析创建Schema Schema schema = new Parser().parse(AvroSerProvider.class.getClass().getResourceAsStream("/Members.avsc")); // 2.2.打开一个通道,把schema和输出的序列化文件关联起来 mDfw.create(schema, new File("E:/avro/members.avro")); // 4.把刚刚创建的Users类数据追加到数据文件写入对象中 mDfw.append(m1); mDfw.append(m2); mDfw.append(m3); // 5.关闭数据文件写入对象 mDfw.close(); System.out.println("Tools Builder Ser Start Complete."); } // ******************************************************ser // end********************************************************* /**  * 服务端支持的网络通讯协议有:NettyServer、SocketServer、HttpServer  * 采用HTTPSERVER方式调用  *   * @throws IOException  * @throws InterruptedException  */ @Test public void MemberHttpRPCDynBuilderServer() throws IOException, InterruptedException { // 1.进行业务处理 GenericResponder gr = bussinessDeal(); // 2.开启一个HTTP服务端,进行等待客户端的连接 Server server = new HttpServer(gr, 60090); server.start(); System.out.println("Dyn Builder PRC Start Complete."); server.join(); } /**  * 服务端支持的网络通讯协议有:NettyServer、SocketServer、HttpServer  * 采用Netty方式调用  *   * @throws IOException  * @throws InterruptedException  */ @Test public void MemberNettyRPCDynBuilderServer() throws IOException, InterruptedException { // 1.进行业务处理 GenericResponder gr = bussinessDeal(); // 2.开启一个Netty服务端,进行等待客户端的连接 Server server = new NettyServer(gr, new InetSocketAddress(60090)); server.start(); System.out.println("Dyn Builder PRC Start Complete."); server.join(); } /**  * 主要进行业务处理 服务端逻辑处理 采用动态生成代码处理方式,客户端和服务端只需要有protocol文件即可,不需要手工生成代码  *   * @return  * @throws IOException  */ private GenericResponder bussinessDeal() throws IOException { // 1.构建协议 final Protocol protocol = Protocol.parse(this.getClass().getResourceAsStream("/Members.avpr")); // 2.构建业务逻辑及响应客户端 GenericResponder gr = new GenericResponder(protocol) { @Override public Object respond(Message message, Object request) throws Exception { System.err.println("request:" + request); // 3.获取请求信息 GenericRecord record = (GenericRecord) request; GenericRecord retGr = null; // 4.判断请求的方法 if (message.getName().equals("login")) { // 5.获取到传输的参数 Object obj = record.get("m"); GenericRecord mGr = (GenericRecord) obj; String userName = mGr.get("userName").toString(); String userPwd = mGr.get("userPwd").toString(); // 6.进行相应的业务逻辑处理 System.out.println("Members:" + ",userName:" + userName + mGr + ",userPwd:" + userPwd); String retMsg; if (userName.equalsIgnoreCase("rita") && userPwd.equals("123456")) { retMsg = "哈哈,恭喜你,成功登录。"; System.out.println(retMsg); } else { retMsg = "登录失败。"; System.out.println(retMsg); } // 7.获取返回值类型 retGr = new GenericData.Record(protocol.getMessages().get("login").getResponse()); // 8.构造回复消息 retGr.put("msg", retMsg); } System.err.println("DEAL SUCCESS!"); return retGr; } }; return gr; } /**  * Java工具生成协议代码方式:java -jar  E:\avro\avro-tools-1.7.7.jar compile protocol E:\avro\Members.avpr E:\avro  * 功能和动态调用方式一致  * @throws InterruptedException   */ @Test public void MemberNettyRPCToolsBuilderServer() throws InterruptedException{ //1.构造接口和实现类的映射相应对象,MemberIFaceImpl该类为具体的业务实现类 SpecificResponder responder=new SpecificResponder(MemberIFace.class, new MemberIFaceImpl()); //2.Netty启动RPC服务 Server server=new NettyServer(responder, new InetSocketAddress(60090)); server.start(); System.out.println("Tools Builder PRC Start Complete."); server.join(); }}

客户端代码:含(动态|工具)反序列化、RPC通讯代码

package com.ifree.serrpc.avro;import java.io.File;import java.io.IOException;import java.net.InetSocketAddress;import java.net.URL;import org.apache.avro.Protocol;import org.apache.avro.Schema;import org.apache.avro.Schema.Parser;import org.apache.avro.file.DataFileReader;import org.apache.avro.generic.GenericData;import org.apache.avro.generic.GenericRecord;import org.apache.avro.io.DatumReader;import org.apache.avro.ipc.HttpTransceiver;import org.apache.avro.ipc.NettyTransceiver;import org.apache.avro.ipc.Transceiver;import org.apache.avro.ipc.generic.GenericRequestor;import org.apache.avro.ipc.specific.SpecificRequestor;import org.apache.avro.specific.SpecificDatumReader;import org.junit.Test;import com.ifree.serrpc.builder.MemberIFace;import com.ifree.serrpc.builder.Members;import com.ifree.serrpc.builder.Retmsg;/** * 服务消费者 该类测试了通过工具和动态序列化及反序列化两种方式,同时测试了通过工具生成代码及动态调用RPC服务两种方式 *  * @author ifree * */public class MemberServerConsumer {	/**	 * 动态反序列:通过Schema文件进行动态反序列化操作	 * 	 * @throws IOException	 */	@Test	public void MemberInfoDynDeser() throws IOException {		// 1.schema文件解析		Parser parser = new Parser();		Schema mSchema = parser.parse(this.getClass().getResourceAsStream("/Members.avsc"));		// 2.构建数据读对象		DatumReader
 mGr = new SpecificDatumReader
(mSchema); DataFileReader
 mDfr = new DataFileReader
(new File("E:/avro/members.avro"), mGr); // 3.从序列化文件中进行数据反序列化取出数据 GenericRecord gr = null; while (mDfr.hasNext()) { gr = mDfr.next(); System.err.println("deser data:" + gr.toString()); } mDfr.close(); System.out.println("Dyn Builder Ser Start Complete."); } /**  * 通过Java工具来生成必要的类,进行反序列化操作  *   * @throws IOException  */ @Test public void MemberInfoToolsDeser() throws IOException { // 1.构建反序列化读取对象 DatumReader
 mDr = new SpecificDatumReader
(Members.class); DataFileReader
 mDfr = new DataFileReader
(new File("E:/avro/members.avro"), mDr); Members m = null; // 2.循环读取文件数据 while (mDfr.hasNext()) { m = mDfr.next(); System.err.println("tools deser data :" + m); } // 3.关闭读取对象 mDfr.close(); System.out.println("Tools Builder Ser Start Complete."); } /**  * 采用HTTP方式建立和服务端的连接  *   * @throws IOException  */ @Test public void MemberHttpRPCDynBuilderClient() throws IOException { // 1.建立和服务端的http通讯 Transceiver transceiver = new HttpTransceiver(new URL("http://192.168.1.116:60090")); bussinessDeal(transceiver); } /**  * 采用Netty方式建立和服务端的连接  *   * @throws IOException  */ @Test public void MemberNettyRPCDynBuilderClient() throws IOException { // 1.建立和服务端的Netty通讯 Transceiver transceiver = new NettyTransceiver(new InetSocketAddress("192.168.1.116", 60090)); // 2.进行必要的业务处理 bussinessDeal(transceiver); } /**  * 进行必要的业务处理  *   * @param transceiver  * @throws IOException  */ private void bussinessDeal(Transceiver transceiver) throws IOException { // 2.获取协议 Protocol protocol = Protocol.parse(this.getClass().getResourceAsStream("/Members.avpr")); // 3.根据协议和通讯构造请求对象 GenericRequestor requestor = new GenericRequestor(protocol, transceiver); // 4.根据schema获取messages主节点内容 GenericRecord loginGr = new GenericData.Record(protocol.getMessages().get("login").getRequest()); // 5.在根据协议里面获取request中的schema GenericRecord mGr = new GenericData.Record(protocol.getType("Members")); // 6.设置request中的请求数据 mGr.put("userName", "rita"); mGr.put("userPwd", "123456"); // 7、把二级内容加入到一级message的主节点中 loginGr.put("m", mGr); // 8.设置完毕后,请求方法,正式发送访问请求信息,并得到响应内容 Object retObj = requestor.request("login", loginGr); // 9.进行解析操作 GenericRecord upGr = (GenericRecord) retObj; System.out.println(upGr.get("msg")); } /**  * Java工具生成协议代码方式:java -jar E:\avro\avro-tools-1.7.7.jar compile protocol  * E:\avro\Members.avpr E:\avro 功能和动态调用方式一致  *   * @throws InterruptedException  * @throws IOException  */ @Test public void MemberNettyRPCToolsBuilderClient() throws InterruptedException, IOException { // 1.和服务端建立通讯 Transceiver transceiver = new NettyTransceiver(new InetSocketAddress("192.168.1.116", 60090)); // 2.获取客户端对象 MemberIFace memberIFace = SpecificRequestor.getClient(MemberIFace.class, transceiver); // 3.进行数据设置 Members members = new Members(); members.setUserName("rita"); members.setUserPwd("123456"); // 开始调用登录方法 Retmsg retmsg = memberIFace.login(members); System.out.println("Recive Msg:" + retmsg.getMsg()); }}

具体业务实现类

package com.ifree.serrpc.impl;import org.apache.avro.AvroRemoteException;import com.ifree.serrpc.builder.MemberIFace;import com.ifree.serrpc.builder.Members;import com.ifree.serrpc.builder.Retmsg;/*** 具体的业务处理类* @author Administrator**/public class MemberIFaceImpl implements MemberIFace {     final String userName="rita";     final String userPwd="888888";     /**     * 登录业务处理     */     @Override     public Retmsg login(Members m) throws AvroRemoteException {          //验证登录权限          if(m.getUserName().equals(userName)&&m.getUserPwd().equals(userPwd)){               return new Retmsg("恭喜你,登录成功,欢迎进入AVRO测试环境。");          }          return new Retmsg("对不起,权限不足,不能登录。");     }}

demo地址:

转载于:https://my.oschina.net/tearsky/blog/509610

你可能感兴趣的文章
Nginx代理Tomcat
查看>>
Apache与Tomcat的区别
查看>>
mysql—Access denied for user 'root'@'localhost' (using password:NO)
查看>>
hibernate 懒加载异常
查看>>
python3的zip函数
查看>>
Paxos算法详细图解
查看>>
如何用Exchange Server 2003 构建多域名邮件系统
查看>>
httpd服务如何开机启动
查看>>
JAVA帮助文档全系列 JDK1.5 JDK1.6 JDK1.7 官方中英完整版下载
查看>>
android 1.6中LinearLayout getBaseline函数的一个bug
查看>>
shell3
查看>>
分享几个好用的工具,有效提升工作效率
查看>>
论北京北漂的家人们
查看>>
delphi 检查用户输入必须是汉字串
查看>>
思科交换机和路由器设备实现DHCP功能
查看>>
MongoDB安装与操作大全
查看>>
人我的是好有是的好sula
查看>>
编译工程时报java:[1,0] illegal character: \65279问题排查与解决过
查看>>
RHEL6子接口及双网卡绑定配置
查看>>
常见系统故障排查
查看>>