HBase 0.96及以上版本实现Endpoints的方式跟0.94版本及以前版本是不一样的,原因是0.96版本中protocol buffers的引入, 此处不介绍之前的实现方式,仅留下链接,其中有所介绍:Coprocessor Introduction

Coprocessor包括Observers和Endpoints,这里只详细介绍Endpoints的实现。Endpoint的实现是安装在HBase服务器端的,客户端通过HBase RPC调用endpoint。 实现一个endpoint的步骤如下:

  • 编写.proto文件,在其中定义coprocessor service和相关的messages,并执行protoc命令编译该文件 (HBase使用小记(1)–HBase 0.96 自定义Comparator中有介绍如何编写.proto文件以及编译方法)

  • 编写Endpoint类,实现如下接口

    (1)由.proto文件编译生成的java代码中的Service接口 (2)org.apache.hadoop.hbase.coprocessor.CoprocessorService接口,该接口是RegionCoprocessorHost用于注册service的

  • 客户端调用方法HTable.coprocessorService()执行endpoint RPCs。

通过一个详细点儿的小示例走一遍上述步骤,假设HBase表student_info中的每条记录是一个学生的个人信息,包括姓名、性别、身份证等, 我们想获取男生(或女生)的总人数,不可能把所有的记录取回客户端再统计,那么可以通过Endpoint实现。

#第一步,编写并编译NumCount.proto文件##

该文件包括三个部分:request、response、service。

option java_outer_classname = "NumCountProtos";
option java_generic_services = true;

message NumCountRequest{
	required int32 sex = 1;
}

message NumCountResponse{
	required int64 count = 1;
}

service NumCountService{
	rpc getNumCount(NumCountRequest) returns (NumCountResponse);
}

编译生成NumCountProtos.java文件

#第二步,编写Endpoint类##

NumCountEndpoint.java代码如下:

package us.gfzj.hadooprelated.hbase.endpoint;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.util.Bytes;

import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;

import us.gfzj.hadooprelated.hbase.endpoint.NumCountProtos.NumCountRequest;
import us.gfzj.hadooprelated.hbase.endpoint.NumCountProtos.NumCountResponse;
import us.gfzj.hadooprelated.hbase.endpoint.NumCountProtos.NumCountService;

public class NumCountEndpoint extends NumCountService implements
	CoprocessorService, Coprocessor {

private RegionCoprocessorEnvironment env;

@Override
public void start(CoprocessorEnvironment arg0) throws IOException {
	if (env instanceof RegionCoprocessorEnvironment) {
		this.env = (RegionCoprocessorEnvironment) env;
	} else {
		throw new CoprocessorException("Must be loaded on a table region!");
	}
}

@Override
public void stop(CoprocessorEnvironment arg0) throws IOException {
	// nothing to do
}

@Override
public Service getService() {
	return this;
}

@Override
public void getNumCount(RpcController controller, NumCountRequest request,
		RpcCallback<NumCountResponse> done) {
	NumCountResponse response = null;
	int sex = request.getSex();
	Scan scan = new Scan();
	scan.setFilter(new SingleColumnValueFilter(Bytes.toBytes("cf"), 
			Bytes.toBytes("sex"), CompareOp.EQUAL, Bytes.toBytes(sex)));
	int count = 0;
	try (InternalScanner scanner = env.getRegion().getScanner(scan)) {
		List<Cell> kvResults = new ArrayList<Cell>();
		while (scanner.next(kvResults)) {
			count++;
			kvResults.clear();
		}
	} catch (IOException e) {
		// TODO Auto-generated catch block
		e.printStackTrace();
	}
	response = NumCountResponse.newBuilder().setCount(count).build();
	done.run(response);
}

}

#第三步,编写Endpoints客户端##

NumCountClient.java代码如下:

package us.gfzj.hadooprelated.hbase.endpoint;

import java.io.IOException;
import java.util.Map;

import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;

import com.google.protobuf.ServiceException;

import us.gfzj.hadooprelated.hbase.endpoint.NumCountProtos.NumCountRequest;
import us.gfzj.hadooprelated.hbase.endpoint.NumCountProtos.NumCountResponse;
import us.gfzj.hadooprelated.hbase.endpoint.NumCountProtos.NumCountService;
import us.gfzj.hadooprelated.hbase.util.BasicConnection;

public class NumCountClient {

public static void main(String[] args) throws ServiceException, Throwable {
	HTableInterface table = null;
	//initialize your table 
	//table = BasicConnection.getTable("student_info");
	Batch.Call<NumCountService, NumCountResponse> callable = 
			new Batch.Call<NumCountService, NumCountResponse>() {
		ServerRpcController controller = new ServerRpcController();
		BlockingRpcCallback<NumCountResponse> rpcCallback = 
				new BlockingRpcCallback<NumCountResponse>();

		@Override
		public NumCountResponse call(NumCountService service)
				throws IOException {
			NumCountRequest.Builder builder = NumCountRequest.newBuilder();
			builder.setSex(0); //0 表示男性 1表示女性
			service.getNumCount(controller, builder.build(), rpcCallback);
			return rpcCallback.get();
		}
	};
	
	Map<byte[], NumCountResponse> result = table.coprocessorService(NumCountService.class, 
			null, null, callable);
	for (NumCountResponse response : result.values())
		System.out.println(response.getCount());
	
	table.close();
} }

其实,HBase本身为很多数据结构提供序列化工具,如scan、filter等都有,在hbase的hbase-protocol子工程中有其对应的.proto文件和 编译后的文件。这些工具,在coprocessor的调用中中可以直接使用,例如,客户端直接给出scan对象作为调用coprocessor的参数, endpoint端有提供工具ProtobufUtil(Scan scan = ProtobufUtil.toScan(request.getScan());),直接调用就可以反序列化。 上述例子使用的NumCountRequest完全可以在客户端由scan代替,此处使用NumCountRequest仅是为了更全面地介绍endpoint。