The Observer coprocessor typically executes before or after a specific event, such as a Get or Put operation. This is similar to a trigger in a relational database management system (RDBMS). The Endpoint coprocessor, on the other hand, resembles a stored procedure in RDBMS because it allows users to perform custom computations directly on the RegionServer instead of the client side.
1 Introduction to CoprocessorsWhen you need to count data in HBase, such as finding the maximum value of a field, the number of records that meet certain conditions, or classifying records based on their characteristics, the conventional approach involves scanning the entire table and performing statistical processing on the client side. This method can lead to significant overhead, including high network bandwidth usage and increased RPC pressure.
HBase, as a column-oriented database, has often been criticized for its inability to easily create secondary indexes and for the difficulty in performing operations like summation, counting, and sorting. In earlier versions (like 0.92), calculating the total number of rows required using a Counter method or initiating a MapReduce job. Although HBase integrates with MapReduce for distributed calculations, many simple aggregation tasks could benefit from executing directly on the server side to reduce network overhead and improve performance. Therefore, HBase introduced coprocessors after version 0.92, enabling features like easy secondary index creation, complex filters, and access control.
In simple terms, a coprocessor is a mechanism that allows users to execute part of their logic on the data storage side, which is the HBase server itself. It enables users to run their own code on the HBase server, enhancing efficiency and reducing data transfer across the network.
2 Classification of CoprocessorsThere are two types of coprocessors: system coprocessors, which apply globally to all tables on a RegionServer, and table coprocessors, which users can specify for individual tables. The coprocessor framework offers different plugin interfaces to enhance flexibility. One is the Observer, which functions similarly to a trigger in an RDBMS, while the other is the Endpoint, akin to a stored procedure.
The design of the Observer allows users to reload the upcall methods of the coprocessor framework by inserting code. The specific event-triggered callback methods are executed by the core code of HBase. The coprocessor framework handles all the details of the callback calls, and the coprocessor itself only needs to insert additional functionality.
The Endpoint provides an interface for dynamic RPC plugins. Its implementation code is installed on the server side, allowing it to be triggered via HBase RPC. The client library offers a convenient way to call these dynamic interfaces, enabling users to invoke endpoints at any time. The implementation code runs remotely on the target Region, and the result is returned to the client. Users can combine these powerful plugin interfaces to add new features to HBase.
3 Use of Protocol BufferSince the following example of Endpoint encoding uses Google's mixed-language data standard, Protocol Buffer, let's first explore the tools commonly used in RPC systems.
3.1 Introduction to Protocol BufferProtocol Buffer is a lightweight and efficient structured data storage format suitable for structured data serialization. It is ideal for data storage or RPC data exchange formats. It supports language-independent, platform-independent, and scalable serialized structured data formats for communication protocols, data storage, and more. APIs are available in C++, Java, and Python.
Why use Protocol Buffer? Consider a common scenario in actual development where the client program is written in Java and may run on various platforms, such as Linux, Windows, or Android, while the server program is usually developed on Linux using C++. When communicating between these programs, there are several ways to design the message format, such as:
1. Directly passing byte-aligned structures in C/C++. If the structure declaration is a fixed-length format, this method is very convenient for C/C++ programs. However, it is cumbersome for Java developers who need to store received data in ByteBuffer and read each field one by one according to the agreed byte order before assigning values to domain variables.
2. Using the SOAP protocol (WebService) as the message packet format. This method generates messages based on text format, adding a large amount of XML description information, which increases network IO burden. Due to the complexity of XML parsing, this will also significantly reduce message parsing performance. Overall, this design leads to a significant reduction in the system's overall operational performance.
Protocol Buffer effectively solves the problems caused by the above two methods. Additionally, it ensures compatibility between new and old versions of the same message, making it a reliable choice for data serialization.
3.2 Installing Protocol Buffer// Unzip the downloaded protobuf-2.6.1.tar.gz file from https://developers.google.com/protocol-buffers/docs/downloads
$ tar -xvf protobuf-2.6.1.tar.gz -C app/
// Delete the zip file
$ rm protobuf-2.6.1.tar.gz
// Install the C++ compiler related package
$ sudo apt-get install g++
// Compile and install protobuf
$ cd app/protobuf-2.6.1/
$ ./configure
$ make
$ make check
$ sudo make install
// Add to lib
$ vim ~/.bashrc
Export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/lib
$ source ~/.bashrc
// Verify installation
$ protoc --version
3.3 Writing proto filesFirst, you need to write a proto file that defines the structured data that needs to be processed in your program. Proto files are similar to data definitions in Java or C. The following code gives the contents of the endpoint.proto file that defines the RPC interface in the example:
[plain] view plain copy // Define common options
option java_package = "com.hbase.demo.endpoint"; // Specify the package name of the generated Java code
option java_outer_classname = "Sum"; // Specify the external class name to generate Java code
option java_generic_services = true; // Generate abstract service code based on service definition
option optimize_for = SPEED; // Specify the optimization level
// Define the request package
Message SumRequest {
Required string family = 1; // column family
Required string column = 2; // column name
}
// Define a reply package
Message SumResponse {
Required int64 sum = 1 [default = 0]; // summation result
}
// Define the RPC service
Service SumService {
// Get the summation result
Rpc getSum(SumRequest)
Returns (SumResponse);
}
3.4 Compiling proto files// Compile the proto file to generate Java code
$ protoc endpoint.proto --java_out=.
// The generated file Sum.java is shown below:
Business logic such as summation, sorting, and other functions are placed on the server side. After the server completes the calculation, the result is sent to the client, which reduces the amount of data transmission. The following example will generate an RPC service on the server side of HBase, where the server sums the specified column values of the specified table and returns the calculation result to the client. The client calls the RPC service and outputs the response result.
4.1 Server CodeFirst, import the RPC interface file Sum.java generated by Protocol Buffer into the project, then create a new class SumEndPoint in the project to write the server code:
[java] view plain copy package com.hbase.demo.endpoint;
Import java.io.IOException;
Import java.util.ArrayList;
Import java.util.List;
Import org.apache.hadoop.hbase.Cell;
Import org.apache.hadoop.hbase.CellUtil;
Import org.apache.hadoop.hbase.Coprocessor;
Import org.apache.hadoop.hbase.CoprocessorEnvironment;
Import org.apache.hadoop.hbase.client.Scan;
Import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
Import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
Import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
Import org.apache.hadoop.hbase.protobuf.ResponseConverter;
Import org.apache.hadoop.hbase.regionserver.InternalScanner;
Import org.apache.hadoop.hbase.util.Bytes;
Import com.google.protobuf.RpcCallback;
Import com.google.protobuf.RpcController;
Import com.google.protobuf.Service;
Import com.hbase.demo.endpoint.Sum.SumRequest;
Import com.hbase.demo.endpoint.Sum.SumResponse;
Import com.hbase.demo.endpoint.Sum.SumService;
/**
* @author developer
* Description: server code for hbase coprocessor endpooint
* Function: Inherit the rpc interface generated by the protocol buffer, perform the summation operation after the server obtains the data of the specified column, and finally return the result to the client.
*/
Public class SumEndPoint extends SumService implements Coprocessor,CoprocessorService {
Private RegionCoprocessorEnvironment env; // Define the environment
@Override
Public Service getService() {
Return this;
}
@Override
Public void getSum(RpcController controller, SumRequest request, RpcCallback<SumResponse> done) {
// Define the variable
SumResponse response = null;
InternalScanner scanner = null;
// Set the scan object
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes(request.getFamily()));
scan.addColumn(Bytes.toBytes(request.getFamily()), Bytes.toBytes(request.getColumn()));
// scan each region, sum after value
Try {
scanner = env.getRegion().getScanner(scan);
List<Cell> results = new ArrayList<Cell>();
Boolean hasMore = false;
Long sum = 0L;
Do {
hasMore = scanner.next(results);
For (Cell cell : results) {
sum += Long.parseLong(new String(CellUtil.cloneValue(cell)));\n
}
results.clear();
} while (hasMore);\n
// Set the return result
response = SumResponse.newBuilder().setSum(sum).build();\n
} catch (IOException e) {\n
ResponseConverter.setControllerException(controller, e);\n
} finally {\n
if (scanner != null) {\n
try {\n
scanner.close();\n
} catch (IOException e) {\n
e.printStackTrace();\n
}\n
}\n
}\n
// Return the rpc result to the client
done.run(response);\n
}\n
// method called when the coprocessor is initialized
@Override
Public void start(CoprocessorEnvironment env) throws IOException {\n
If (env instanceof RegionCoprocessorEnvironment) {\n
This.env = (RegionCoprocessorEnvironment)env;\n
} else {\n
Throw new CoprocessorException("no load region");\n
}\n
}\n
// method called when the coprocessor ends
@Override
Public void stop(CoprocessorEnvironment env) throws IOException {\n
}\n
}
4.2 Client CodeCreate a new class SumClient in the project as a client test program that calls the RPC service. The code is as follows:
[java] view plain copy package com.hbase.demo.endpoint;
Import java.io.IOException;
Import java.util.Map;
Import org.apache.hadoop.conf.Configuration;
Import org.apache.hadoop.hbase.HBaseConfiguration;
Import org.apache.hadoop.hbase.TableName;
Import org.apache.hadoop.hbase.client.Connection;
Import org.apache.hadoop.hbase.client.ConnectionFactory;
Import org.apache.hadoop.hbase.client.HTable;
Import org.apache.hadoop.hbase.client.coprocessor.Batch;
Import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
Import com.google.protobuf.ServiceException;
Import com.hbase.demo.endpoint.Sum.SumRequest;
Import com.hbase.demo.endpoint.Sum.SumResponse;
Import com.hbase.demo.endpoint.Sum.SumService;
/**
* @author developer
* Description: client code for hbase coprocessor endpooint
* Function: Get the summation result of the data of the specified column of the hbase table from the server
*/
Public class SumClient {
Public static void main(String[] args) throws ServiceException, Throwable {
Long sum = 0L;
// Configure HBse
Configuration conf = HBaseConfiguration.create();
Conf.set("hbase.zookeeper.quorum", "localhost");
Conf.set("hbase.zookeeper.property.clientPort", "2222");
// Establish a database connection
Connection conn = ConnectionFactory.createConnection(conf);
// Get the table
HTable table = (HTable) conn.getTable(TableName.valueOf("sum_table"));
// Set the request object
Final SumRequest request = SumRequest.newBuilder().setFamily("info").setColumn("score").build();
// Get the return value
Map<byte[], Long> result = table.coprocessorService(SumService.class, null, null,
New Batch.Call<SumService, Long>() {
@Override
Public Long call(SumService service) throws IOException {
BlockingRpcCallback<SumResponse> rpcCallback = new BlockingRpcCallback<SumResponse>();
service.getSum(null, request, rpcCallback);
SumResponse response = (SumResponse) rpcCallback.get();
Return response.hasSum() ? response.getSum() : 0L;
}
});
// Iteratively add the return value
For (Long v : result.values()) {
sum += v;
}
// result output
System.out.println("sum: " + sum);
// close the resource
Table.close();
Conn.close();
}
}
4.3 Loading Endpoint// Package the Sum and SumEndPoint classes and upload them to HDFS
$ hadoop fs -put endpoint_sum.jar /input
// Modify the hbase configuration file, add configuration
$ vim app/hbase-1.2.0-cdh5.7.1/conf/hbase-site.xml
[html] view plain copy "property"
"name" hbase.table.sanity.checks "/name"
"value" false "/value"
"/property"
// restart hbase
$ stop-hbase.sh
$ start-hbase.sh
// Start the hbase shell
$ hbase shell
// Create a table sum_table
》 create 'sum_table', 'info'
// insert test data
》 put 'sum_table', 'rowkey01', 'info:score', '95'
》 put 'sum_table', 'rowkey02', 'info:score', '98'
》 put 'sum_table', 'rowkey02', 'info:age', '20'
// View data
》 scan 'sum_table'
// load the coprocessor
》 disable 'sum_table'
Alter 'sum_table', METHOD = "'table_att', 'coprocessor' = "'hdfs://localhost:9000/input/endpoint_sum.jar|com.hbase.demo.endpoint.SumEndPoint|100'
》 enable 'sum_table'
// If you want to uninstall the coprocessor, you can first check the coprocessor name in the table and then uninstall it by command.
》 disable 'sum_table'
" describe 'sum_table'
》 alter 'sum_table', METHOD = "'table_att_unset', NAME='coprocessor$1'
》 enable 'sum_table'
Run the client program SumClient in eclipse, the output is 193, just in line with expectations, as shown below:
Generally, indexing a database requires a separate data structure to store indexed data. In HBase tables, in addition to using rowkey index data, you can also create an additional index table. Query the index table first, and then query the data table with the query results. The following example shows how to use the Observer coprocessor to generate the secondary index of the HBase table: the value of the column info:name in the data table ob_table is used as the rowkey of the index table index_ob_table, and the value of the column info:score in the data table ob_table is used as the index table index_ob_table column info: score value establishes a secondary index. When the user inserts data into the data table, the index table will automatically insert the secondary index, which facilitates querying the business data.
5.1 CodeIn the project, create a new class PutObserver as the Observer coprocessor application logic class, the code is as follows:
[java] view plain copy package com.hbase.demo.observer;
Import java.io.IOException;
Import java.util.List;
Import org.apache.hadoop.hbase.Cell;
Import org.apache.hadoop.hbase.CellUtil;
Import org.apache.hadoop.hbase.TableName;
Import org.apache.hadoop.hbase.client.Durability;
Import org.apache.hadoop.hbase.client.HTableInterface;
Import org.apache.hadoop.hbase.client.Put;
Import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
Import org.apache.hadoop.hbase.coprocessor.ObserverContext;
Import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
Import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
Import org.apache.hadoop.hbase.util.Bytes;
/**
* @author developer
* Description: application logic code of hbase coprocessor observer
* Function: In the hbase table to which the observer is applied, all put operations will use the info:name column value of each row of data as the rowkey and info:score column values as the value.
* Write another secondary index table index_ob_table to improve query efficiency for specific fields
*/
@SuppressWarnings("deprecation")
Public class PutObserver extends BaseRegionObserver{
@Override
Public void postPut(ObserverContext<RegionCoprocessorEnvironment> e,
Put put, WALEdit edit, Durability durability) throws IOException {
// Get the secondary index table
HTableInterface table = e.getEnvironment().getTable(TableName.valueOf("index_ob_table"));
// get the value
List<Cell> cellList1 = put.get(Bytes.toBytes("info"), Bytes.toBytes("name"));
List<Cell> cellList2 = put.get(Bytes.toBytes("info"), Bytes.toBytes("score"));
// Insert data into the secondary index table
For (Cell cell1 : cellList1) {
// column info:name value as the rowkey of the secondary index table
Put indexPut = new Put(CellUtil.cloneValue(cell1));
For (Cell cell2 : cellList2) {
// column info: score value as the value of the column info:score in the secondary index table
indexPut.add(Bytes.toBytes("info"), Bytes.toBytes("score"), CellUtil.cloneValue(cell2));
}
// Data is inserted into the secondary index table
table.put(indexPut);
}
// close the resource
table.close();
}
}
5.2 Loading Observer// Package the PutObserver class and upload it to HDFS
$ hadoop fs -put ovserver_put.jar /input
// Start the hbase shell
$ hbase shell
// Create a data table ob_table
》 create 'ob_table', 'info'
// Create a secondary index table ob_table
》 create 'index_ob_table', 'info'
// load the coprocessor
》disable 'ob_table'
Alter 'ob_table', METHOD = "'table_att', 'coprocessor' = "'hdfs://localhost:9000/input/observer_put.jar|com.hbase.demo.observer.PutObserver|100'
》 enable 'ob_table'
// View data table ob_table
" describe 'ob_table'
//
Digit Segment Led Display,Washing Machine Display,Smd Led Display,Bar Segment Led Display
Wuxi Ark Technology Electronic Co.,Ltd. , https://www.arkledcn.com