HBase应用小记(4)--HBase 0.96 Endpoints使用记录
本文为系列文章hbase中的一篇,相关文章:
HBase 0.96及以上版本实现Endpoints的方式跟0.94版本及以前版本是不一样的,原因是0.96版本中protocol buffers的引入, 此处不介绍之前的实现方式,仅留下链接,其中有所介绍:Coprocessor Introduction
Coprocessor包括Observers和Endpoints,这里只详细介绍Endpoints的实现。Endpoint的实现是安装在HBase服务器端的,客户端通过HBase RPC调用endpoint。 实现一个endpoint的步骤如下:
-
编写.proto文件,在其中定义coprocessor service和相关的messages,并执行protoc命令编译该文件 (HBase使用小记(1)–HBase 0.96 自定义Comparator中有介绍如何编写.proto文件以及编译方法)
-
编写Endpoint类,实现如下接口
(1)由.proto文件编译生成的java代码中的Service接口 (2)org.apache.hadoop.hbase.coprocessor.CoprocessorService接口,该接口是RegionCoprocessorHost用于注册service的
-
客户端调用方法HTable.coprocessorService()执行endpoint RPCs。
通过一个详细点儿的小示例走一遍上述步骤,假设HBase表student_info中的每条记录是一个学生的个人信息,包括姓名、性别、身份证等, 我们想获取男生(或女生)的总人数,不可能把所有的记录取回客户端再统计,那么可以通过Endpoint实现。
#第一步,编写并编译NumCount.proto文件##
该文件包括三个部分:request、response、service。
option java_outer_classname = "NumCountProtos";
option java_generic_services = true;
message NumCountRequest{
required int32 sex = 1;
}
message NumCountResponse{
required int64 count = 1;
}
service NumCountService{
rpc getNumCount(NumCountRequest) returns (NumCountResponse);
}
编译生成NumCountProtos.java文件
#第二步,编写Endpoint类##
NumCountEndpoint.java代码如下:
package us.gfzj.hadooprelated.hbase.endpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import us.gfzj.hadooprelated.hbase.endpoint.NumCountProtos.NumCountRequest;
import us.gfzj.hadooprelated.hbase.endpoint.NumCountProtos.NumCountResponse;
import us.gfzj.hadooprelated.hbase.endpoint.NumCountProtos.NumCountService;
public class NumCountEndpoint extends NumCountService implements
CoprocessorService, Coprocessor {
private RegionCoprocessorEnvironment env;
@Override
public void start(CoprocessorEnvironment arg0) throws IOException {
if (env instanceof RegionCoprocessorEnvironment) {
this.env = (RegionCoprocessorEnvironment) env;
} else {
throw new CoprocessorException("Must be loaded on a table region!");
}
}
@Override
public void stop(CoprocessorEnvironment arg0) throws IOException {
// nothing to do
}
@Override
public Service getService() {
return this;
}
@Override
public void getNumCount(RpcController controller, NumCountRequest request,
RpcCallback<NumCountResponse> done) {
NumCountResponse response = null;
int sex = request.getSex();
Scan scan = new Scan();
scan.setFilter(new SingleColumnValueFilter(Bytes.toBytes("cf"),
Bytes.toBytes("sex"), CompareOp.EQUAL, Bytes.toBytes(sex)));
int count = 0;
try (InternalScanner scanner = env.getRegion().getScanner(scan)) {
List<Cell> kvResults = new ArrayList<Cell>();
while (scanner.next(kvResults)) {
count++;
kvResults.clear();
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
response = NumCountResponse.newBuilder().setCount(count).build();
done.run(response);
}
}
#第三步,编写Endpoints客户端##
NumCountClient.java代码如下:
package us.gfzj.hadooprelated.hbase.endpoint;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import com.google.protobuf.ServiceException;
import us.gfzj.hadooprelated.hbase.endpoint.NumCountProtos.NumCountRequest;
import us.gfzj.hadooprelated.hbase.endpoint.NumCountProtos.NumCountResponse;
import us.gfzj.hadooprelated.hbase.endpoint.NumCountProtos.NumCountService;
import us.gfzj.hadooprelated.hbase.util.BasicConnection;
public class NumCountClient {
public static void main(String[] args) throws ServiceException, Throwable {
HTableInterface table = null;
//initialize your table
//table = BasicConnection.getTable("student_info");
Batch.Call<NumCountService, NumCountResponse> callable =
new Batch.Call<NumCountService, NumCountResponse>() {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<NumCountResponse> rpcCallback =
new BlockingRpcCallback<NumCountResponse>();
@Override
public NumCountResponse call(NumCountService service)
throws IOException {
NumCountRequest.Builder builder = NumCountRequest.newBuilder();
builder.setSex(0); //0 表示男性 1表示女性
service.getNumCount(controller, builder.build(), rpcCallback);
return rpcCallback.get();
}
};
Map<byte[], NumCountResponse> result = table.coprocessorService(NumCountService.class,
null, null, callable);
for (NumCountResponse response : result.values())
System.out.println(response.getCount());
table.close();
} }
其实,HBase本身为很多数据结构提供序列化工具,如scan、filter等都有,在hbase的hbase-protocol子工程中有其对应的.proto文件和 编译后的文件。这些工具,在coprocessor的调用中中可以直接使用,例如,客户端直接给出scan对象作为调用coprocessor的参数, endpoint端有提供工具ProtobufUtil(Scan scan = ProtobufUtil.toScan(request.getScan());),直接调用就可以反序列化。 上述例子使用的NumCountRequest完全可以在客户端由scan代替,此处使用NumCountRequest仅是为了更全面地介绍endpoint。