View Javadoc

1   /*
2    * Copyright (c) 2011.  The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.hbql.impl;
22  
23  import org.apache.hadoop.conf.Configuration;
24  import org.apache.hadoop.hbase.HBaseConfiguration;
25  import org.apache.hadoop.hbase.HColumnDescriptor;
26  import org.apache.hadoop.hbase.HTableDescriptor;
27  import org.apache.hadoop.hbase.MasterNotRunningException;
28  import org.apache.hadoop.hbase.ZooKeeperConnectionException;
29  import org.apache.hadoop.hbase.client.HBaseAdmin;
30  import org.apache.hadoop.hbase.client.HTablePool;
31  import org.apache.hadoop.hbase.client.tableindexed.IndexSpecification;
32  import org.apache.hadoop.hbase.client.tableindexed.IndexedTable;
33  import org.apache.hadoop.hbase.client.tableindexed.IndexedTableAdmin;
34  import org.apache.hadoop.hbase.client.tableindexed.IndexedTableDescriptor;
35  import org.apache.hadoop.hbase.hbql.client.AsyncExecutor;
36  import org.apache.hadoop.hbase.hbql.client.AsyncExecutorManager;
37  import org.apache.hadoop.hbase.hbql.client.ExecutionResults;
38  import org.apache.hadoop.hbase.hbql.client.HBatch;
39  import org.apache.hadoop.hbase.hbql.client.HBqlException;
40  import org.apache.hadoop.hbase.hbql.client.HConnection;
41  import org.apache.hadoop.hbase.hbql.client.HMapping;
42  import org.apache.hadoop.hbase.hbql.client.HPreparedStatement;
43  import org.apache.hadoop.hbase.hbql.client.HRecord;
44  import org.apache.hadoop.hbase.hbql.client.HResultSet;
45  import org.apache.hadoop.hbase.hbql.client.HStatement;
46  import org.apache.hadoop.hbase.hbql.client.QueryExecutorPool;
47  import org.apache.hadoop.hbase.hbql.client.QueryExecutorPoolManager;
48  import org.apache.hadoop.hbase.hbql.mapping.AnnotationResultAccessor;
49  import org.apache.hadoop.hbase.hbql.mapping.FamilyMapping;
50  import org.apache.hadoop.hbase.hbql.mapping.TableMapping;
51  import org.apache.hadoop.hbase.hbql.statement.args.KeyInfo;
52  import org.apache.hadoop.hbase.hbql.statement.args.WithArgs;
53  import org.apache.hadoop.hbase.hbql.util.AtomicReferences;
54  import org.apache.hadoop.hbase.hbql.util.Maps;
55  import org.apache.hadoop.hbase.hbql.util.PoolableElement;
56  import org.apache.hadoop.hbase.hbql.util.Sets;
57  import org.apache.hadoop.hbase.util.Bytes;
58  
59  import java.io.IOException;
60  import java.util.List;
61  import java.util.Map;
62  import java.util.Set;
63  import java.util.concurrent.atomic.AtomicBoolean;
64  import java.util.concurrent.atomic.AtomicReference;
65  
66  public class HConnectionImpl extends PoolableElement<HConnectionImpl> implements HConnection {
67  
68      public static final String MAXTABLEREFS = "maxtablerefs";
69      public static final String MASTER       = "hbase.master";
70  
71      private final AtomicBoolean atomicClosed = new AtomicBoolean(false);
72      private final Configuration  configuration;
73      private final HTablePool     tablePool;
74      private final int            maxTablePoolReferencesPerTable;
75      private final MappingManager mappingManager;
76  
77      private final AtomicReference<Map<Class, AnnotationResultAccessor>> atomicAnnoMapping     = AtomicReferences.newAtomicReference();
78      private final AtomicReference<HBaseAdmin>                           atomicHbaseAdmin      = AtomicReferences.newAtomicReference();
79      private final AtomicReference<IndexedTableAdmin>                    atomicIndexTableAdmin = AtomicReferences.newAtomicReference();
80  
81      private String queryExecutorPoolName = null;
82      private String asyncExecutorName     = null;
83  
84      public HConnectionImpl(final Configuration configuration,
85                             final HConnectionPoolImpl connectionPool,
86                             final int maxTablePoolReferencesPerTable) throws HBqlException {
87          super(connectionPool);
88          this.configuration = (configuration == null) ? HBaseConfiguration.create() : configuration;
89          this.maxTablePoolReferencesPerTable = maxTablePoolReferencesPerTable;
90          this.tablePool = new HTablePool(this.getConfiguration(), this.getMaxTablePoolReferencesPerTable());
91          this.mappingManager = new MappingManager(this);
92  
93          this.getMappingManager().validatePersistentMetadata();
94      }
95  
96      public static Configuration getConfiguration(final String master) {
97          final Configuration configuration = new Configuration();
98          configuration.set(HConnectionImpl.MASTER, master);
99          return HBaseConfiguration.create(configuration);
100     }
101 
102     private AtomicReference<Map<Class, AnnotationResultAccessor>> getAtomicAnnoMapping() {
103         return this.atomicAnnoMapping;
104     }
105 
106     public void resetElement() {
107 
108         try {
109             this.getAtomicClosed().set(false);
110             this.getMappingManager().clear();
111         }
112         catch (HBqlException e) {
113             e.printStackTrace();
114         }
115 
116         if (this.getAtomicAnnoMapping().get() != null)
117             this.getAtomicAnnoMapping().get().clear();
118 
119         this.setQueryExecutorPoolName(null);
120     }
121 
122     public boolean isPooled() {
123         return this.getElementPool() != null;
124     }
125 
126     public Configuration getConfiguration() {
127         return this.configuration;
128     }
129 
130     private HTablePool getTablePool() {
131         return this.tablePool;
132     }
133 
134     private MappingManager getMappingManager() throws HBqlException {
135         this.checkIfClosed();
136         return this.mappingManager;
137     }
138 
139     public int getMaxTablePoolReferencesPerTable() {
140         return this.maxTablePoolReferencesPerTable;
141     }
142 
143     private Map<Class, AnnotationResultAccessor> getAnnotationMappingMap() {
144         if (this.getAtomicAnnoMapping().get() == null) {
145             synchronized (this) {
146                 if (this.getAtomicAnnoMapping().get() == null) {
147                     final Map<Class, AnnotationResultAccessor> newmap = Maps.newConcurrentHashMap();
148                     this.getAtomicAnnoMapping().set(newmap);
149                 }
150             }
151         }
152         return this.getAtomicAnnoMapping().get();
153     }
154 
155     public AnnotationResultAccessor getAnnotationMapping(final Object obj) throws HBqlException {
156         return this.getAnnotationMapping(obj.getClass());
157     }
158 
159     public synchronized AnnotationResultAccessor getAnnotationMapping(final Class<?> clazz) throws HBqlException {
160 
161         AnnotationResultAccessor accessor = getAnnotationMappingMap().get(clazz);
162 
163         if (accessor != null) {
164             return accessor;
165         }
166         else {
167             accessor = AnnotationResultAccessor.newAnnotationMapping(this, clazz);
168             getAnnotationMappingMap().put(clazz, accessor);
169             return accessor;
170         }
171     }
172 
173     private AtomicReference<HBaseAdmin> getAtomicHbaseAdmin() {
174         return this.atomicHbaseAdmin;
175     }
176 
177     public HBaseAdmin getHBaseAdmin() throws HBqlException {
178         this.checkIfClosed();
179         if (this.getAtomicHbaseAdmin().get() == null) {
180             synchronized (this) {
181                 if (this.getAtomicHbaseAdmin().get() == null) {
182                     try {
183                         this.getAtomicHbaseAdmin().set(new HBaseAdmin(this.getConfiguration()));
184                     }
185                     catch (MasterNotRunningException e) {
186                         throw new HBqlException(e);
187                     }
188                     catch (ZooKeeperConnectionException e) {
189                         throw new HBqlException(e);
190                     }
191                 }
192             }
193         }
194         return this.getAtomicHbaseAdmin().get();
195     }
196 
197     private AtomicReference<IndexedTableAdmin> getAtomicIndexTableAdmin() {
198         return this.atomicIndexTableAdmin;
199     }
200 
201     public IndexedTableAdmin getIndexTableAdmin() throws HBqlException {
202         this.checkIfClosed();
203         if (this.getAtomicIndexTableAdmin().get() == null) {
204             synchronized (this) {
205                 if (this.getAtomicIndexTableAdmin().get() == null) {
206                     try {
207                         this.getAtomicIndexTableAdmin().set(new IndexedTableAdmin(this.getConfiguration()));
208                     }
209                     catch (MasterNotRunningException e) {
210                         throw new HBqlException(e);
211                     }
212                     catch (ZooKeeperConnectionException e) {
213                         throw new HBqlException(e);
214                     }
215                 }
216             }
217         }
218         return this.getAtomicIndexTableAdmin().get();
219     }
220 
221     public Set<String> getFamilyNames(final String tableName) throws HBqlException {
222         this.checkIfClosed();
223         final HTableDescriptor table = this.getHTableDescriptor(tableName);
224         final Set<String> familySet = Sets.newHashSet();
225         for (final HColumnDescriptor descriptor : table.getColumnFamilies())
226             familySet.add(Bytes.toString(descriptor.getName()));
227         return familySet;
228     }
229 
230     public boolean familyExistsForTable(final String familyName, final String tableName) throws HBqlException {
231         final Set<String> names = this.getFamilyNames(tableName);
232         return names.contains(familyName);
233     }
234 
235     public boolean familyExistsForMapping(final String familyName, final String mappingName) throws HBqlException {
236         final TableMapping mapping = this.getMapping(mappingName);
237         return mapping.containsFamily(familyName);
238     }
239 
240     public IndexedTableDescriptor newIndexedTableDescriptor(final String tableName) throws HBqlException {
241         this.checkIfClosed();
242         try {
243             final HTableDescriptor tableDesc = this.getHTableDescriptor(tableName);
244             return new IndexedTableDescriptor(tableDesc);
245         }
246         catch (IOException e) {
247             throw new HBqlException(e);
248         }
249     }
250 
251     public boolean indexExistsForMapping(final String indexName, final String mappingName) throws HBqlException {
252         final TableMapping mapping = this.getMapping(mappingName);
253         return this.indexExistsForTable(indexName, mapping.getTableName());
254     }
255 
256     public IndexSpecification getIndexForTable(final String indexName, final String tableName) throws HBqlException {
257         final IndexedTableDescriptor indexDesc = this.newIndexedTableDescriptor(tableName);
258         return indexDesc.getIndex(indexName);
259     }
260 
261     public boolean indexExistsForTable(final String indexName, final String tableName) throws HBqlException {
262         this.checkIfClosed();
263         final IndexSpecification index = this.getIndexForTable(indexName, tableName);
264         return index != null;
265     }
266 
267     public void dropIndexForMapping(final String indexName, final String mappingName) throws HBqlException {
268         final TableMapping mapping = this.getMapping(mappingName);
269         this.dropIndexForTable(mapping.getTableName(), indexName);
270     }
271 
272     public void dropIndexForTable(final String tableName, final String indexName) throws HBqlException {
273         this.validateIndexExistsForTable(indexName, tableName);
274         try {
275             final IndexedTableAdmin ita = this.getIndexTableAdmin();
276             ita.removeIndex(tableName.getBytes(), indexName);
277         }
278         catch (IOException e) {
279             throw new HBqlException(e);
280         }
281     }
282 
283     public HStatement createStatement() throws HBqlException {
284         this.checkIfClosed();
285         return new HStatementImpl(this);
286     }
287 
288     public HPreparedStatement prepareStatement(final String sql) throws HBqlException {
289         this.checkIfClosed();
290         return new HPreparedStatementImpl(this, sql);
291     }
292 
293     public void releaseElement() {
294         this.getElementPool().release(this);
295     }
296 
297     private AtomicBoolean getAtomicClosed() {
298         return this.atomicClosed;
299     }
300 
301     public boolean isClosed() {
302         return this.getAtomicClosed().get();
303     }
304 
305     public synchronized void close() {
306         if (!this.isClosed()) {
307             this.getAtomicClosed().set(true);
308             // If it is a pool conection, just give it back to pool (reset() will be called on release)
309             if (this.isPooled())
310                 this.releaseElement();
311         }
312     }
313 
314     private void checkIfClosed() throws HBqlException {
315         if (this.isClosed())
316             throw new HBqlException("Connection is closed");
317     }
318 
319     public ExecutionResults execute(final String sql) throws HBqlException {
320         final HStatement stmt = this.createStatement();
321         return stmt.execute(sql);
322     }
323 
324     public HResultSet<HRecord> executeQuery(final String sql) throws HBqlException {
325         final HStatement stmt = this.createStatement();
326         return stmt.executeQuery(sql);
327     }
328 
329     public <T> HResultSet<T> executeQuery(final String sql, final Class clazz) throws HBqlException {
330         final HStatement stmt = this.createStatement();
331         return stmt.executeQuery(sql, clazz);
332     }
333 
334     public List<HRecord> executeQueryAndFetch(final String sql) throws HBqlException {
335         final HStatement stmt = this.createStatement();
336         return stmt.executeQueryAndFetch(sql);
337     }
338 
339     public <T> List<T> executeQueryAndFetch(final String sql, final Class clazz) throws HBqlException {
340         final HStatement stmt = this.createStatement();
341         return stmt.executeQueryAndFetch(sql, clazz);
342     }
343 
344     public ExecutionResults executeUpdate(final String sql) throws HBqlException {
345         final HStatement stmt = this.createStatement();
346         return stmt.executeUpdate(sql);
347     }
348 
349     // Mapping Routines
350     public boolean mappingExists(final String mappingName) throws HBqlException {
351         return this.getMappingManager().mappingExists(mappingName);
352     }
353 
354     public TableMapping getMapping(final String mappingName) throws HBqlException {
355         return this.getMappingManager().getMapping(mappingName);
356     }
357 
358     public boolean dropMapping(final String mappingName) throws HBqlException {
359         return this.getMappingManager().dropMapping(mappingName);
360     }
361 
362     public Set<HMapping> getAllMappings() throws HBqlException {
363         return this.getMappingManager().getAllMappings();
364     }
365 
366     public TableMapping createMapping(final boolean tempMapping,
367                                       final boolean systemMapping,
368                                       final String mappingName,
369                                       final String tableName,
370                                       final KeyInfo keyInfo,
371                                       final List<FamilyMapping> familyList) throws HBqlException {
372         return this.getMappingManager().createMapping(tempMapping,
373                                                       systemMapping,
374                                                       mappingName,
375                                                       tableName,
376                                                       keyInfo,
377                                                       familyList);
378     }
379 
380     // Table Routines
381     public void createTable(final HTableDescriptor tableDesc) throws HBqlException {
382         this.checkIfClosed();
383         final String tableName = tableDesc.getNameAsString();
384         if (this.tableExists(tableName))
385             throw new HBqlException("Table already exists: " + tableName);
386 
387         try {
388             this.getHBaseAdmin().createTable(tableDesc);
389         }
390         catch (IOException e) {
391             throw new HBqlException(e);
392         }
393     }
394 
395     public HTableWrapper newHTableWrapper(final WithArgs withArgs, final String tableName) throws HBqlException {
396 
397         this.checkIfClosed();
398 
399         try {
400             if (withArgs != null && withArgs.hasAnIndex())
401                 return new HTableWrapper(new IndexedTable(this.getConfiguration(), tableName.getBytes()), null);
402             else
403                 return new HTableWrapper(this.getTablePool().getTable(tableName), this.getTablePool());
404         }
405         catch (IOException e) {
406             throw new HBqlException(e);
407         }
408         catch (RuntimeException e) {
409             throw new HBqlException("Invalid table name: " + tableName);
410         }
411     }
412 
413     public boolean tableExists(final String tableName) throws HBqlException {
414         try {
415             return this.getHBaseAdmin().tableExists(tableName);
416         }
417         catch (MasterNotRunningException e) {
418             throw new HBqlException(e);
419         }
420         catch (IOException e) {
421             throw new HBqlException(e);
422         }
423     }
424 
425     public HTableDescriptor getHTableDescriptor(final String tableName) throws HBqlException {
426         this.validateTableName(tableName);
427         try {
428             return this.getHBaseAdmin().getTableDescriptor(tableName.getBytes());
429         }
430         catch (IOException e) {
431             throw new HBqlException(e);
432         }
433     }
434 
435     public boolean tableAvailable(final String tableName) throws HBqlException {
436         this.validateTableName(tableName);
437         try {
438             return this.getHBaseAdmin().isTableAvailable(tableName);
439         }
440         catch (IOException e) {
441             throw new HBqlException(e);
442         }
443     }
444 
445     public boolean tableEnabled(final String tableName) throws HBqlException {
446         this.validateTableName(tableName);
447         try {
448             return this.getHBaseAdmin().isTableEnabled(tableName);
449         }
450         catch (IOException e) {
451             throw new HBqlException(e);
452         }
453     }
454 
455     public void dropTable(final String tableName) throws HBqlException {
456         validateTableDisabled(tableName, "drop");
457         try {
458             final byte[] tableNameBytes = tableName.getBytes();
459             this.getHBaseAdmin().deleteTable(tableNameBytes);
460         }
461         catch (IOException e) {
462             throw new HBqlException(e);
463         }
464     }
465 
466     public void disableTable(final String tableName) throws HBqlException {
467         try {
468             if (!this.tableEnabled(tableName))
469                 throw new HBqlException("Cannot disable disabled table: " + tableName);
470             this.getHBaseAdmin().disableTable(tableName);
471         }
472         catch (IOException e) {
473             throw new HBqlException(e);
474         }
475     }
476 
477     public void enableTable(final String tableName) throws HBqlException {
478         validateTableDisabled(tableName, "enable");
479         try {
480             this.getHBaseAdmin().enableTable(tableName);
481         }
482         catch (IOException e) {
483             throw new HBqlException(e);
484         }
485     }
486 
487     public Set<String> getTableNames() throws HBqlException {
488         try {
489             final HBaseAdmin admin = this.getHBaseAdmin();
490             final Set<String> tableSet = Sets.newHashSet();
491             for (final HTableDescriptor table : admin.listTables())
492                 tableSet.add(table.getNameAsString());
493             return tableSet;
494         }
495         catch (IOException e) {
496             throw new HBqlException(e);
497         }
498     }
499 
500     // The value returned from this call must be eventually released.
501     public CompletionQueueExecutor getQueryExecutorForConnection() throws HBqlException {
502 
503         if (!Utils.isValidString(this.getQueryExecutorPoolName()))
504             throw new HBqlException("Connection not assigned a QueryExecutorPool name");
505 
506         this.validateQueryExecutorPoolNameExists(this.getQueryExecutorPoolName());
507 
508         final QueryExecutorPool pool = QueryExecutorPoolManager.getQueryExecutorPool(this.getQueryExecutorPoolName());
509         final CompletionQueueExecutor executorQueue = ((QueryExecutorPoolImpl)pool).take();
510 
511         // Reset it prior to handing it out
512         executorQueue.resetElement();
513         return executorQueue;
514     }
515 
516     // The value returned from this call must eventually be released.
517     public AsyncExecutorImpl getAsyncExecutorForConnection() throws HBqlException {
518 
519         if (!Utils.isValidString(this.getAsyncExecutorName()))
520             throw new HBqlException("Connection not assigned an AsyncExecutor name");
521 
522         this.validateAsyncExecutorNameExists(this.getAsyncExecutorName());
523 
524         final AsyncExecutor executor = AsyncExecutorManager.getAsyncExecutor(this.getAsyncExecutorName());
525 
526         return ((AsyncExecutorImpl)executor);
527     }
528 
529 
530     public boolean usesQueryExecutor() {
531         return Utils.isValidString(this.getQueryExecutorPoolName());
532     }
533 
534     public String getQueryExecutorPoolName() {
535         return this.queryExecutorPoolName;
536     }
537 
538     public void setQueryExecutorPoolName(final String poolName) {
539         this.queryExecutorPoolName = poolName;
540     }
541 
542     public String getAsyncExecutorName() {
543         return this.asyncExecutorName;
544     }
545 
546     public void setAsyncExecutorName(final String poolName) {
547         this.asyncExecutorName = poolName;
548     }
549 
550     public <T> HBatch<T> newHBatch() {
551         return new HBatch<T>(this);
552     }
553 
554     public void validateTableName(final String tableName) throws HBqlException {
555         if (!this.tableExists(tableName))
556             throw new HBqlException("Table not found: " + tableName);
557     }
558 
559     public void validateTableDisabled(final String tableName, final String action) throws HBqlException {
560         if (this.tableEnabled(tableName))
561             throw new HBqlException("Cannot " + action + " enabled table: " + tableName);
562     }
563 
564     public void validateFamilyExistsForTable(final String familyName, final String tableName) throws HBqlException {
565         if (!this.familyExistsForTable(familyName, tableName))
566             throw new HBqlException("Family " + familyName + " not defined for table " + tableName);
567     }
568 
569     public void validateIndexExistsForTable(final String indexName, final String tableName) throws HBqlException {
570         if (!this.indexExistsForTable(indexName, tableName))
571             throw new HBqlException("Index " + indexName + " not defined for table " + tableName);
572     }
573 
574     public void validateQueryExecutorPoolNameExists(final String poolName) throws HBqlException {
575         if (!QueryExecutorPoolManager.queryExecutorPoolExists(poolName))
576             throw new HBqlException("QueryExecutorPool " + poolName + " does not exist.");
577     }
578 
579     public void validateAsyncExecutorNameExists(final String name) throws HBqlException {
580         if (!AsyncExecutorManager.asyncExecutorExists(name))
581             throw new HBqlException("AsyncExecutor " + name + " does not exist.");
582     }
583 }