Presto 是一个分布式的 SQL 查询引擎,非常适合用于 OLAP 场景。官方也许因为版权原因没有提供 oracle 的插件,oracle 在实际场景中还是使用的非常多的,有必要介绍些插件开发的流程。如果读者只是部署,不做开发,可以 clone 我托管在 GitHub 的Presto 来进行编译、部署。
搭建开发环境 关于如何搭建开发环境,presto 的 github 首页已经给出教程,这里不再赘述。但是要注意presto 在 windows 平台下会编译失败 ,而且对源码开发之前必须要先编译 presto。这里推荐使用 IntelliJIDEA 作为开发的 IDE, 如果你已经将 presto 导入到 IDE 中,并且成功运行“PrestoServer”,那么你已经成功一半了,其实在 github 上面有人已经托管了presto-oracle 这个插件,但是这个插件只能满足简单的查询,无法通过 presto 向 oracle 中插入数据。而且它这个不是集成到 presto 的源码中的,无法对插件进行调试。
新建 module 官方已经编写了 MySQL 插件,我们可以按照这个模板来开发。我们在 Presto 的根目录下新建 module,该 module 的 pom 信息如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 <?xml version="1.0"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>com.facebook.presto</groupId> <artifactId>presto-root</artifactId> <version>0.157.2-SNAPSHOT</version> </parent> <artifactId>presto-oracle</artifactId> <description>Presto - Oracle Connector</description> <packaging>presto-plugin</packaging> <properties> <!--check license--> <air.main.basedir>${project.parent.basedir}</air.main.basedir> </properties> <dependencies> <dependency> <groupId>com.facebook.presto</groupId> <artifactId>presto-base-jdbc</artifactId> </dependency> <dependency> <groupId>io.airlift</groupId> <artifactId>configuration</artifactId> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> <dependency> <groupId>com.google.inject</groupId> <artifactId>guice</artifactId> </dependency> <dependency> <groupId>javax.validation</groupId> <artifactId>validation-api</artifactId> </dependency> <dependency> <groupId>com.oracle</groupId> <artifactId>ojdbc6</artifactId> </dependency> <dependency> <groupId>javax.inject</groupId> <artifactId>javax.inject</artifactId> </dependency> <!-- Presto SPI --> <dependency> <groupId>com.facebook.presto</groupId> <artifactId>presto-spi</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>io.airlift</groupId> <artifactId>slice</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>io.airlift</groupId> <artifactId>units</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> <scope>provided</scope> </dependency> <!-- for testing --> <dependency> <groupId>org.testng</groupId> <artifactId>testng</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>io.airlift</groupId> <artifactId>testing</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>io.airlift</groupId> <artifactId>json</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.facebook.presto</groupId> <artifactId>presto-main</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.facebook.presto</groupId> <artifactId>presto-tpch</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>io.airlift.tpch</groupId> <artifactId>tpch</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.facebook.presto</groupId> <artifactId>presto-tests</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>io.airlift</groupId> <artifactId>testing-mysql-server</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <version>2.10</version> <configuration> <!--disable check dependency--> <skip>true</skip> <failOnWarning>${air.check.fail-dependency}</failOnWarning> <ignoreNonCompile>true</ignoreNonCompile> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-checkstyle-plugin</artifactId> <executions> <execution> <phase>validate</phase> <goals> <goal>check</goal> </goals> <configuration> <!--disable check code style--> <skip>true</skip> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
这里需要说明下的是 Maven 的公共库查找不到 Oracle JDBC 的依赖,所以需要用户自行下载 jar 包并安装到本地 Maven 库中。另外 Presto 有很严格的代码规范以及依赖检查,如果代码或者依赖不通过检查是无法编译成功的。而且公共 Maven 库中无法找到 Oracle JDBC 的依赖,所以依赖检查肯定不能通过。所以我在 pom 文件中禁用了代码规范检查插件,还有依赖检查插件。
1. 禁用依赖检查 1 2 3 4 5 6 7 8 9 10 11 <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <version>2.10</version> <configuration> <!--disable check dependency--> <skip>true</skip> <failOnWarning>${air.check.fail-dependency}</failOnWarning> <ignoreNonCompile>true</ignoreNonCompile> </configuration> </plugin>
2. 禁用代码规范检查 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-checkstyle-plugin</artifactId> <executions> <execution> <phase>validate</phase> <goals> <goal>check</goal> </goals> <configuration> <!--disable check code style--> <skip>true</skip> </configuration> </execution> </executions> </plugin>
3. 在 presto-root 下添加 ojdbc 的依赖信息 1 2 3 4 5 <dependency> <groupId>com.oracle</groupId> <artifactId>ojdbc6</artifactId> <version>11.2.0.4.0-atlassian-hosted</version> </dependency>
集成 module 到 Presto 因为我们是基于源码开发的,为了将 presto-oracle 集成到 Presto 中进行测试以及打包发布还需如下配置:
1.修改config.properties 配置文件 config.properties 文件在“presto/presto-main/etc”路径下,在plugin.bundles 下添加“../presto-oracle/pom.xml”。
只有添加了 presto-oracle 的 pom 信息 presto 在 IDE 中调试时再回加载 presto-oracle 插件,否则无效 ,上述配置只是用于开发环境,正式环境下无需配置。
2. 修改 presto.xml 配置文件 presto.xml 文件在“presto/presto-server/src/main/provisio”路径下,添加如下配置信息: 1 2 3 4 5 <artifactSet to="plugin/oracle"> <artifact id="${project.groupId}:presto-oracle:zip:${project.version}"> <unpack /> </artifact> </artifactSet>
上述配置的作用是在 presto 编译时可以将我们的 presto-oracle 插件添加到 plugin 目录下。
编写代码 插件的代码就四个类,一点都不复杂,但是需要说明的是这些代码必须包含 license 信息,因为 presto 配置了证书的检查插件,如果代码中不包含license 编译时会报错。这个不像代码检查那么麻烦,代码检查有一点点不规范就会报错,这个证书检查只要在自己新建的类中添加 license 就即可通过。
1.OraclePlugin.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 /* * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.facebook.presto.plugin.oracle; import com.facebook.presto.plugin.jdbc.JdbcPlugin; public class OraclePlugin extends JdbcPlugin { public OraclePlugin() { super("oracle", new OracleClientModule()); } }
上面构造函数中传入的”oracle” 应该是用于后面 catalog 中 name 的配置项,这个猜测没有验证,先这样配置。
2.OracleConfig.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 /* * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.facebook.presto.plugin.oracle; import io.airlift.configuration.Config; public class OracleConfig { private String user; private String password; private String url; public String getUser() { return user; } @Config("oracle.user") public OracleConfig setUser(String user) { this.user = user; return this; } public String getPassword() { return password; } @Config("oracle.password") public OracleConfig setPassword(String password) { this.password = password; return this; } public String getUrl() { return url; } @Config("oracle.password") public OracleConfig setUrl(String url) { this.url = url; return this; } }
上述代码中的注解千万不要省略掉,否则 presto 加载 catalog 时无法查找到这些属性。
3.OracleClientModule.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 /* * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.facebook.presto.plugin.oracle; import com.facebook.presto.plugin.jdbc.BaseJdbcConfig; import com.facebook.presto.plugin.jdbc.JdbcClient; import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Scopes; import static io.airlift.configuration.ConfigBinder.configBinder; public class OracleClientModule implements Module { @Override public void configure(Binder binder) { binder.bind(JdbcClient.class).to(OracleClient.class).in(Scopes.SINGLETON); configBinder(binder).bindConfig(BaseJdbcConfig.class); configBinder(binder).bindConfig(OracleConfig.class); } }
4.OracleClient.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 /* * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.facebook.presto.plugin.oracle; import com.facebook.presto.plugin.jdbc.*; import com.facebook.presto.spi.*; import com.facebook.presto.spi.type.Type; import com.google.common.base.Joiner; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import oracle.jdbc.OracleDriver; import javax.annotation.Nullable; import javax.inject.Inject; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.UUID; import static com.facebook.presto.plugin.jdbc.JdbcErrorCode.JDBC_ERROR; import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND; import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static com.google.common.collect.Iterables.getOnlyElement; import static com.google.common.collect.Maps.fromProperties; import static java.util.Locale.ENGLISH; public class OracleClient extends BaseJdbcClient { @Inject public OracleClient(JdbcConnectorId connectorId, BaseJdbcConfig config, OracleConfig oracleConfig) throws SQLException { super(connectorId, config, "", new OracleDriver()); } @Override public Set<String> getSchemaNames() { try (Connection connection = driver.connect(connectionUrl, connectionProperties); ResultSet resultSet = connection.getMetaData().getSchemas()) { ImmutableSet.Builder<String> schemaNames = ImmutableSet.builder(); while (resultSet.next()) { String schemaName = resultSet.getString(1).toLowerCase(); schemaNames.add(schemaName); } return schemaNames.build(); } catch (SQLException e) { throw Throwables.propagate(e); } } @Override protected ResultSet getTables(Connection connection, String schemaName, String tableName) throws SQLException { return connection.getMetaData().getTables(null, schemaName, tableName, new String[] { "TABLE", "SYNONYM" }); } @Nullable @Override public JdbcTableHandle getTableHandle(SchemaTableName schemaTableName) { try (Connection connection = driver.connect(connectionUrl, connectionProperties)) { DatabaseMetaData metadata = connection.getMetaData(); String jdbcSchemaName = schemaTableName.getSchemaName(); String jdbcTableName = schemaTableName.getTableName(); if (metadata.storesUpperCaseIdentifiers()) { jdbcSchemaName = jdbcSchemaName.toUpperCase(); jdbcTableName = jdbcTableName.toUpperCase(); } try (ResultSet resultSet = getTables(connection, jdbcSchemaName, jdbcTableName)) { List<JdbcTableHandle> tableHandles = new ArrayList<>(); while (resultSet.next()) { tableHandles.add(new JdbcTableHandle(connectorId, schemaTableName, resultSet.getString("TABLE_CAT"), resultSet.getString("TABLE_SCHEM"), resultSet .getString("TABLE_NAME"))); } if (tableHandles.isEmpty()) { return null; } if (tableHandles.size() > 1) { throw new PrestoException(NOT_SUPPORTED, "Multiple tables matched: " + schemaTableName); } return getOnlyElement(tableHandles); } } catch (SQLException e) { throw Throwables.propagate(e); } } @Override public List<JdbcColumnHandle> getColumns(JdbcTableHandle tableHandle) { try (Connection connection = driver.connect(connectionUrl, connectionProperties)) { ( (oracle.jdbc.driver.OracleConnection)connection ).setIncludeSynonyms(true); DatabaseMetaData metadata = connection.getMetaData(); String schemaName = tableHandle.getSchemaName().toUpperCase(); String tableName = tableHandle.getTableName().toUpperCase(); try (ResultSet resultSet = metadata.getColumns(null, schemaName, tableName, null)) { List<JdbcColumnHandle> columns = new ArrayList<>(); boolean found = false; while (resultSet.next()) { found = true; Type columnType = toPrestoType(resultSet .getInt("DATA_TYPE"), resultSet.getInt("COLUMN_SIZE")); if (columnType != null) { String columnName = resultSet.getString("COLUMN_NAME"); columns.add(new JdbcColumnHandle(connectorId, columnName, columnType)); } } if (!found) { throw new TableNotFoundException( tableHandle.getSchemaTableName()); } if (columns.isEmpty()) { throw new PrestoException(NOT_SUPPORTED, "Table has no supported column types: " + tableHandle.getSchemaTableName()); } return ImmutableList.copyOf(columns); } } catch (SQLException e) { throw Throwables.propagate(e); } } @Override public List<SchemaTableName> getTableNames(@Nullable String schema) { try (Connection connection = driver.connect(connectionUrl, connectionProperties)) { DatabaseMetaData metadata = connection.getMetaData(); if (metadata.storesUpperCaseIdentifiers() && (schema != null)) { schema = schema.toUpperCase(); } try (ResultSet resultSet = getTables(connection, schema, null)) { ImmutableList.Builder<SchemaTableName> list = ImmutableList .builder(); while (resultSet.next()) { list.add(getSchemaTableName(resultSet)); } return list.build(); } } catch (SQLException e) { throw Throwables.propagate(e); } } @Override protected SchemaTableName getSchemaTableName(ResultSet resultSet) throws SQLException { String tableSchema = resultSet.getString("TABLE_SCHEM"); String tableName = resultSet.getString("TABLE_NAME"); if (tableSchema != null) { tableSchema = tableSchema.toLowerCase(); } if (tableName != null) { tableName = tableName.toLowerCase(); } return new SchemaTableName(tableSchema, tableName); } @Override public void commitCreateTable(JdbcOutputTableHandle handle) { StringBuilder sql = new StringBuilder() .append("ALTER TABLE ") .append(quoted(handle.getCatalogName(), handle.getSchemaName(), handle.getTemporaryTableName())) .append(" RENAME TO ") //new table name needn't to be with catalog and schema .append(handle.getTableName()); try (Connection connection = getConnection(handle)) { execute(connection, sql.toString()); } catch (SQLException e) { throw new PrestoException(JDBC_ERROR, e); } } }
上述代码中覆写了很多方法,主要是不同的数据库规则不一样,需要我们一一适配。之前提到过 github 中已经有人托管了presto-oracle 的插件,但是这个插件没有适配好。例如我又覆写了commitCreateTable 这个方法,主要是因为 oracle 中修改表名时新表名不需要再添加 schema ,否则会报错。
5.BaseJdbcClient.java “BaseJdbcClient”是“OracleClient”的基类,它里面有一个方法涉及到创建临时表的方法,oracle 中表名有长度限制(30以内),所以我对表名进行了字符串的截取操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 private JdbcOutputTableHandle beginWriteTable(ConnectorTableMetadata tableMetadata) { SchemaTableName schemaTableName = tableMetadata.getTable(); String schema = schemaTableName.getSchemaName(); String table = schemaTableName.getTableName(); if (!getSchemaNames().contains(schema)) { throw new PrestoException(NOT_FOUND, "Schema not found: " + schema); } try (Connection connection = driver.connect(connectionUrl, connectionProperties)) { boolean uppercase = connection.getMetaData().storesUpperCaseIdentifiers(); if (uppercase) { schema = schema.toUpperCase(ENGLISH); table = table.toUpperCase(ENGLISH); } String catalog = connection.getCatalog(); String temporaryName = "tmp_presto_" + UUID.randomUUID().toString().replace("-", ""); temporaryName = temporaryName.substring(0, 29); StringBuilder sql = new StringBuilder() .append("CREATE TABLE ") .append(quoted(catalog, schema, temporaryName)) .append(" ("); ImmutableList.Builder<String> columnNames = ImmutableList.builder(); ImmutableList.Builder<Type> columnTypes = ImmutableList.builder(); ImmutableList.Builder<String> columnList = ImmutableList.builder(); for (ColumnMetadata column : tableMetadata.getColumns()) { String columnName = column.getName(); if (uppercase) { columnName = columnName.toUpperCase(ENGLISH); } columnNames.add(columnName); columnTypes.add(column.getType()); columnList.add(new StringBuilder() .append(quoted(columnName)) .append(" ") .append(toSqlType(column.getType())) .toString()); } Joiner.on(", ").appendTo(sql, columnList.build()); sql.append(")"); execute(connection, sql.toString()); return new JdbcOutputTableHandle( connectorId, catalog, schema, table, columnNames.build(), columnTypes.build(), temporaryName, connectionUrl, fromProperties(connectionProperties)); } catch (SQLException e) { throw new PrestoException(JDBC_ERROR, e); } }
编译打包 如果上述操作无误的话,重新编译 presto。编译成功之后会有 tar.gz 和 rpm 两种安装包。
1.tar.gz 文件 tar.gz 文件路径:presto/presto-srver/target
2.rpm 文件路径:presto/presto-server-rpm/target
如果上述操作出现问题,可以参照我托管的 Presto ,也可以留言与我共同探讨 。