Presto-Oracle 插件编写教程

Presto 是一个分布式的SQL 查询引擎,非常适合用于OLAP 场景。官方也许因为版权原因没有提供Oracle 的插件,Oracle 在实际场景中还是使用的非常多的,有必要介绍些插件开发的流程。如果读者只是部署,不做开发,可以Clone 我托管在Github 的Presto 来进行编译、部署。

搭建开发环境

关于如何搭建开发环境,Presto 的Github 首页已经给出教程,这里不再赘述。但是要注意Presto 在Windows 平台下会编译失败,而且对源码开发之前必须要先编译Presto。这里推荐使用IntelliJIDEA 作为开发的IDE,
如果你已经将Presto 导入到IDE中,并且成功运行PrestoServer,那么你已经成功一半了,其实在Github 上面有人已经托管了presto-oracle这个插件,但是这个插件只能满足简单的查询,无法通过Presto 向Oracle 中插入数据。而且它这个不是集成到Presto 的源码中的,无法对插件进行调试。

新建module

官方已经编写了My SQL 插件,我们可以按照这个模板来开发。我们在presto 的根目录下新建 module,该module的pom信息如下:

<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>com.facebook.presto</groupId>
        <artifactId>presto-root</artifactId>
        <version>0.157.2-SNAPSHOT</version>
    </parent>

    <artifactId>presto-oracle</artifactId>
    <description>Presto - Oracle Connector</description>
    <packaging>presto-plugin</packaging>

    <properties>
        <!--check license-->
        <air.main.basedir>${project.parent.basedir}</air.main.basedir>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.facebook.presto</groupId>
            <artifactId>presto-base-jdbc</artifactId>
        </dependency>

        <dependency>
            <groupId>io.airlift</groupId>
            <artifactId>configuration</artifactId>
        </dependency>

        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
        </dependency>

        <dependency>
            <groupId>com.google.inject</groupId>
            <artifactId>guice</artifactId>
        </dependency>

        <dependency>
            <groupId>javax.validation</groupId>
            <artifactId>validation-api</artifactId>
        </dependency>

        <dependency>
            <groupId>com.oracle</groupId>
            <artifactId>ojdbc6</artifactId>
        </dependency>


        <dependency>
            <groupId>javax.inject</groupId>
            <artifactId>javax.inject</artifactId>
        </dependency>

        <!-- Presto SPI -->
        <dependency>
            <groupId>com.facebook.presto</groupId>
            <artifactId>presto-spi</artifactId>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>io.airlift</groupId>
            <artifactId>slice</artifactId>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>io.airlift</groupId>
            <artifactId>units</artifactId>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <scope>provided</scope>
        </dependency>

        <!-- for testing -->
        <dependency>
            <groupId>org.testng</groupId>
            <artifactId>testng</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>io.airlift</groupId>
            <artifactId>testing</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>io.airlift</groupId>
            <artifactId>json</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.facebook.presto</groupId>
            <artifactId>presto-main</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.facebook.presto</groupId>
            <artifactId>presto-tpch</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>io.airlift.tpch</groupId>
            <artifactId>tpch</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.facebook.presto</groupId>
            <artifactId>presto-tests</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>io.airlift</groupId>
            <artifactId>testing-mysql-server</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>

        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <version>2.10</version>
                <configuration>
                    <!--disable check dependency-->
                    <skip>true</skip>
                    <failOnWarning>${air.check.fail-dependency}</failOnWarning>
                    <ignoreNonCompile>true</ignoreNonCompile>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-checkstyle-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>validate</phase>
                        <goals>
                            <goal>check</goal>
                        </goals>
                        <configuration>
                            <!--disable check code style-->
                            <skip>true</skip>
                        </configuration>
                    </execution>
                </executions>

            </plugin>

        </plugins>
    </build>

</project>

这里需要说明下的是Maven 的公共库查找不到Oracle JDBC 的依赖,所以需要用户自行下载jar 包并安装到本地Maven 库中。另外Presto 有很严格的代码规范以及依赖检查,如果代码或者依赖不通过检查是无法编译成功的。而且公共Maven 库中无法找到Oracle JDBC 的依赖,所以依赖检查肯定不能通过。所以我在pom 文件中禁用了代码规范检查插件,还有依赖检查插件。

  • 禁用依赖检查
    <plugin>
                  <groupId>org.apache.maven.plugins</groupId>
                  <artifactId>maven-dependency-plugin</artifactId>
                  <version>2.10</version>
                  <configuration>
                      <!--disable check dependency-->
                      <skip>true</skip>
                      <failOnWarning>${air.check.fail-dependency}</failOnWarning>
                      <ignoreNonCompile>true</ignoreNonCompile>
                  </configuration>
              </plugin>
    
  • 禁用代码规范检查
    <plugin>
                  <groupId>org.apache.maven.plugins</groupId>
                  <artifactId>maven-checkstyle-plugin</artifactId>
                  <executions>
                      <execution>
                          <phase>validate</phase>
                          <goals>
                              <goal>check</goal>
                          </goals>
                          <configuration>
                              <!--disable check code style-->
                              <skip>true</skip>
                          </configuration>
                      </execution>
                  </executions>
    
              </plugin>
    
  • 在presto-root 下添加ojdbc 的依赖信息
    <dependency>
                  <groupId>com.oracle</groupId>
                  <artifactId>ojdbc6</artifactId>
                  <version>11.2.0.4.0-atlassian-hosted</version>
              </dependency>
    

集成module到presto

因为我们是基于源码开发的,为了将presto-oracle 集成到presto 中进行测试以及打包发布还需如下配置:

  • 修改config.properties 配置文件

    config.properties 文件在presto/presto-main/etc 路径下,在plugin.bundles
    下添加' ../presto-oracle/pom.xml '。

    只有添加了presto-oracle 的pom 信息presto 在IDE 中调试时再回加载presto-oracle 插件,否则无效,上述配置只是用于开发环境,正式环境下无需配置。

  • 修改 presto.xml 配置文件

    presto.xml 文件在presto/presto-server/src/main/provisio 路径下,添加如下配置信息:

    <artifactSet to="plugin/oracle">
          <artifact id="${project.groupId}:presto-oracle:zip:${project.version}">
              <unpack />
          </artifact>
      </artifactSet>
    

    上述配置的作用是在presto 编译时可以将我们的presto-oracle 插件添加到plugin目录下。

编写代码

插件的代码就四个类,一点都不复杂,但是需要说明的是这些代码必须包含license 信息,因为presto 配置了证书的检查插件,如果代码中不包含license 编译时会报错。这个不像代码检查那么麻烦,代码检查有一点点不规范就会报错,这个证书检查只要在自己新建的类中添加license 就即可通过。

  • OraclePlugin.java
    /*
    * Licensed under the Apache License, Version 2.0 (the "License");
    * you may not use this file except in compliance with the License.
    * You may obtain a copy of the License at
    *
    *     http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    * See the License for the specific language governing permissions and
    * limitations under the License.
    */
    package com.facebook.presto.plugin.oracle;
    
    import com.facebook.presto.plugin.jdbc.JdbcPlugin;
    
    public class OraclePlugin
          extends JdbcPlugin
    {
      public OraclePlugin()
      {
          super("oracle", new OracleClientModule());
      }
    }
    
    

    上面构造函数中传入的"oracle" 应该是用于后面catalog 中name 的配置项,这个猜测没有验证,先这样配置。

  • OracleConfig.java

    /*
    * Licensed under the Apache License, Version 2.0 (the "License");
    * you may not use this file except in compliance with the License.
    * You may obtain a copy of the License at
    *
    *     http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    * See the License for the specific language governing permissions and
    * limitations under the License.
    */
    package com.facebook.presto.plugin.oracle;
    
    
    import io.airlift.configuration.Config;
    
    public class OracleConfig {
      private String user;
      private String password;
      private String url;
    
    
      public String getUser() {
          return user;
      }
    
      @Config("oracle.user")
      public OracleConfig setUser(String user) {
          this.user = user;
          return this;
      }
    
    
      public String getPassword() {
          return password;
      }
    
    
      @Config("oracle.password")
      public OracleConfig setPassword(String password) {
          this.password = password;
          return this;
      }
    
      public String getUrl() {
          return url;
      }
    
    
      @Config("oracle.password")
      public OracleConfig setUrl(String url) {
          this.url = url;
          return this;
      }
    }
    

    上述代码中的注解千万不要省略掉,否则presto 加载catalog 时无法查找到这些属性。

  • OracleClientModule.java

      /*
       * Licensed under the Apache License, Version 2.0 (the "License");
       * you may not use this file except in compliance with the License.
       * You may obtain a copy of the License at
       *
       *     http://www.apache.org/licenses/LICENSE-2.0
       *
       * Unless required by applicable law or agreed to in writing, software
       * distributed under the License is distributed on an "AS IS" BASIS,
       * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       * See the License for the specific language governing permissions and
       * limitations under the License.
       */
      package com.facebook.presto.plugin.oracle;
    
      import com.facebook.presto.plugin.jdbc.BaseJdbcConfig;
      import com.facebook.presto.plugin.jdbc.JdbcClient;
      import com.google.inject.Binder;
      import com.google.inject.Module;
      import com.google.inject.Scopes;
    
      import static io.airlift.configuration.ConfigBinder.configBinder;
    
      public class OracleClientModule
              implements Module
      {
          @Override
          public void configure(Binder binder)
          {
              binder.bind(JdbcClient.class).to(OracleClient.class).in(Scopes.SINGLETON);
              configBinder(binder).bindConfig(BaseJdbcConfig.class);
              configBinder(binder).bindConfig(OracleConfig.class);
          }
      }
    
    
  • OracleClient.java
    /*
    * Licensed under the Apache License, Version 2.0 (the "License");
    * you may not use this file except in compliance with the License.
    * You may obtain a copy of the License at
    *
    *     http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    * See the License for the specific language governing permissions and
    * limitations under the License.
    */
    package com.facebook.presto.plugin.oracle;
    
    
    import com.facebook.presto.plugin.jdbc.*;
    import com.facebook.presto.spi.*;
    import com.facebook.presto.spi.type.Type;
    import com.google.common.base.Joiner;
    import com.google.common.base.Throwables;
    import com.google.common.collect.ImmutableList;
    import com.google.common.collect.ImmutableSet;
    import oracle.jdbc.OracleDriver;
    
    import javax.annotation.Nullable;
    import javax.inject.Inject;
    
    import java.sql.Connection;
    import java.sql.DatabaseMetaData;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Set;
    import java.util.UUID;
    
    import static com.facebook.presto.plugin.jdbc.JdbcErrorCode.JDBC_ERROR;
    import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
    import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
    import static com.google.common.collect.Iterables.getOnlyElement;
    import static com.google.common.collect.Maps.fromProperties;
    import static java.util.Locale.ENGLISH;
    
    public class OracleClient
          extends BaseJdbcClient
    {
      @Inject
      public OracleClient(JdbcConnectorId connectorId, BaseJdbcConfig config, OracleConfig oracleConfig)
              throws SQLException
      {
          super(connectorId, config, "", new OracleDriver());
      }
    
      @Override
      public Set<String> getSchemaNames() {
          try (Connection connection = driver.connect(connectionUrl,
                  connectionProperties);
               ResultSet resultSet = connection.getMetaData().getSchemas()) {
              ImmutableSet.Builder<String> schemaNames = ImmutableSet.builder();
              while (resultSet.next()) {
                  String schemaName = resultSet.getString(1).toLowerCase();
                  schemaNames.add(schemaName);
              }
              return schemaNames.build();
          } catch (SQLException e) {
              throw Throwables.propagate(e);
          }
      }
    
      @Override
      protected ResultSet getTables(Connection connection, String schemaName, String tableName) throws SQLException {
          return connection.getMetaData().getTables(null, schemaName, tableName,
                  new String[] { "TABLE", "SYNONYM" });
      }
    
      @Nullable
      @Override
      public JdbcTableHandle getTableHandle(SchemaTableName schemaTableName) {
          try (Connection connection = driver.connect(connectionUrl,
                  connectionProperties)) {
              DatabaseMetaData metadata = connection.getMetaData();
              String jdbcSchemaName = schemaTableName.getSchemaName();
              String jdbcTableName = schemaTableName.getTableName();
              if (metadata.storesUpperCaseIdentifiers()) {
                  jdbcSchemaName = jdbcSchemaName.toUpperCase();
                  jdbcTableName = jdbcTableName.toUpperCase();
              }
              try (ResultSet resultSet = getTables(connection, jdbcSchemaName,
                      jdbcTableName)) {
                  List<JdbcTableHandle> tableHandles = new ArrayList<>();
                  while (resultSet.next()) {
                      tableHandles.add(new JdbcTableHandle(connectorId,
                              schemaTableName, resultSet.getString("TABLE_CAT"),
                              resultSet.getString("TABLE_SCHEM"), resultSet
                              .getString("TABLE_NAME")));
                  }
                  if (tableHandles.isEmpty()) {
                      return null;
                  }
                  if (tableHandles.size() > 1) {
                      throw new PrestoException(NOT_SUPPORTED,
                              "Multiple tables matched: " + schemaTableName);
                  }
                  return getOnlyElement(tableHandles);
              }
          } catch (SQLException e) {
              throw Throwables.propagate(e);
          }
      }
    
      @Override
      public List<JdbcColumnHandle> getColumns(JdbcTableHandle tableHandle) {
          try (Connection connection = driver.connect(connectionUrl,
                  connectionProperties)) {
    
              ( (oracle.jdbc.driver.OracleConnection)connection ).setIncludeSynonyms(true);
              DatabaseMetaData metadata = connection.getMetaData();
              String schemaName = tableHandle.getSchemaName().toUpperCase();
              String tableName = tableHandle.getTableName().toUpperCase();
              try (ResultSet resultSet = metadata.getColumns(null, schemaName,
                      tableName, null)) {
                  List<JdbcColumnHandle> columns = new ArrayList<>();
                  boolean found = false;
                  while (resultSet.next()) {
                      found = true;
                      Type columnType = toPrestoType(resultSet
                              .getInt("DATA_TYPE"), resultSet.getInt("COLUMN_SIZE"));
                      if (columnType != null) {
                          String columnName = resultSet.getString("COLUMN_NAME");
                          columns.add(new JdbcColumnHandle(connectorId,
                                  columnName, columnType));
                      }
                  }
                  if (!found) {
                      throw new TableNotFoundException(
                              tableHandle.getSchemaTableName());
                  }
                  if (columns.isEmpty()) {
                      throw new PrestoException(NOT_SUPPORTED,
                              "Table has no supported column types: "
                                      + tableHandle.getSchemaTableName());
                  }
                  return ImmutableList.copyOf(columns);
              }
          } catch (SQLException e) {
              throw Throwables.propagate(e);
          }
      }
    
      @Override
      public List<SchemaTableName> getTableNames(@Nullable String schema) {
          try (Connection connection = driver.connect(connectionUrl,
                  connectionProperties)) {
              DatabaseMetaData metadata = connection.getMetaData();
              if (metadata.storesUpperCaseIdentifiers() && (schema != null)) {
                  schema = schema.toUpperCase();
              }
              try (ResultSet resultSet = getTables(connection, schema, null)) {
                  ImmutableList.Builder<SchemaTableName> list = ImmutableList
                          .builder();
                  while (resultSet.next()) {
                      list.add(getSchemaTableName(resultSet));
                  }
                  return list.build();
              }
          } catch (SQLException e) {
              throw Throwables.propagate(e);
          }
      }
    
      @Override
      protected SchemaTableName getSchemaTableName(ResultSet resultSet) throws SQLException {
          String tableSchema = resultSet.getString("TABLE_SCHEM");
          String tableName = resultSet.getString("TABLE_NAME");
          if (tableSchema != null) {
              tableSchema = tableSchema.toLowerCase();
          }
          if (tableName != null) {
              tableName = tableName.toLowerCase();
          }
          return new SchemaTableName(tableSchema, tableName);
      }
    
      @Override
      public void commitCreateTable(JdbcOutputTableHandle handle) {
          StringBuilder sql = new StringBuilder()
                  .append("ALTER TABLE ")
                  .append(quoted(handle.getCatalogName(), handle.getSchemaName(), handle.getTemporaryTableName()))
                  .append(" RENAME TO ")
                  //new table name needn't to be with catalog and schema
                  .append(handle.getTableName());
    
          try (Connection connection = getConnection(handle)) {
              execute(connection, sql.toString());
          }
          catch (SQLException e) {
              throw new PrestoException(JDBC_ERROR, e);
          }
      }
    
    
    }
    

    上述代码中覆写了很多方法,主要是不同的数据库规则不一样,需要我们一一适配。之前提到过Github 中已经有人托管了presto-oracle 的插件,但是这个插件没有适配好。例如我又覆写了commitCreateTable 这个方法,主要是因为Oracle 中修改表名时新表名不需要再添加schema ,否则会报错。

  • BaseJdbcClient.java

    BaseJdbcClient 是OracleClient 的基类,它里面有一个方法涉及到创建临时表的方法,Oracle 中表名有长度限制(30以内),所以我对表名进行了字符串的截取操作。

    private JdbcOutputTableHandle beginWriteTable(ConnectorTableMetadata tableMetadata)
     {
         SchemaTableName schemaTableName = tableMetadata.getTable();
         String schema = schemaTableName.getSchemaName();
         String table = schemaTableName.getTableName();
    
         if (!getSchemaNames().contains(schema)) {
             throw new PrestoException(NOT_FOUND, "Schema not found: " + schema);
         }
    
         try (Connection connection = driver.connect(connectionUrl, connectionProperties)) {
             boolean uppercase = connection.getMetaData().storesUpperCaseIdentifiers();
             if (uppercase) {
                 schema = schema.toUpperCase(ENGLISH);
                 table = table.toUpperCase(ENGLISH);
             }
             String catalog = connection.getCatalog();
    
             String temporaryName = "tmp_presto_" + UUID.randomUUID().toString().replace("-", "");
             temporaryName = temporaryName.substring(0, 29);
             StringBuilder sql = new StringBuilder()
                     .append("CREATE TABLE ")
                     .append(quoted(catalog, schema, temporaryName))
                     .append(" (");
             ImmutableList.Builder<String> columnNames = ImmutableList.builder();
             ImmutableList.Builder<Type> columnTypes = ImmutableList.builder();
             ImmutableList.Builder<String> columnList = ImmutableList.builder();
             for (ColumnMetadata column : tableMetadata.getColumns()) {
                 String columnName = column.getName();
                 if (uppercase) {
                     columnName = columnName.toUpperCase(ENGLISH);
                 }
                 columnNames.add(columnName);
                 columnTypes.add(column.getType());
                 columnList.add(new StringBuilder()
                         .append(quoted(columnName))
                         .append(" ")
                         .append(toSqlType(column.getType()))
                         .toString());
             }
             Joiner.on(", ").appendTo(sql, columnList.build());
             sql.append(")");
    
             execute(connection, sql.toString());
    
             return new JdbcOutputTableHandle(
                     connectorId,
                     catalog,
                     schema,
                     table,
                     columnNames.build(),
                     columnTypes.build(),
                     temporaryName,
                     connectionUrl,
                     fromProperties(connectionProperties));
         }
         catch (SQLException e) {
             throw new PrestoException(JDBC_ERROR, e);
         }
     }
    

编译打包

如果上述操作无误的话,重新编译presto。编译成功之后会有tar.gz和rpm两种安装包。

  • tar.gz 文件路径:presto/presto-server/target

  • rpm 文件路径:presto/presto-server-rpm/target


如果上述操作出现问题,可以参照我托管的Presto,也可以留言与我共同探讨 。

发表评论

电子邮件地址不会被公开。 必填项已用*标注