1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.hbase.hbql.impl;
22
23 import org.apache.hadoop.conf.Configuration;
24 import org.apache.hadoop.hbase.HBaseConfiguration;
25 import org.apache.hadoop.hbase.HColumnDescriptor;
26 import org.apache.hadoop.hbase.HTableDescriptor;
27 import org.apache.hadoop.hbase.MasterNotRunningException;
28 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
29 import org.apache.hadoop.hbase.client.HBaseAdmin;
30 import org.apache.hadoop.hbase.client.HTablePool;
31 import org.apache.hadoop.hbase.client.tableindexed.IndexSpecification;
32 import org.apache.hadoop.hbase.client.tableindexed.IndexedTable;
33 import org.apache.hadoop.hbase.client.tableindexed.IndexedTableAdmin;
34 import org.apache.hadoop.hbase.client.tableindexed.IndexedTableDescriptor;
35 import org.apache.hadoop.hbase.hbql.client.AsyncExecutor;
36 import org.apache.hadoop.hbase.hbql.client.AsyncExecutorManager;
37 import org.apache.hadoop.hbase.hbql.client.ExecutionResults;
38 import org.apache.hadoop.hbase.hbql.client.HBatch;
39 import org.apache.hadoop.hbase.hbql.client.HBqlException;
40 import org.apache.hadoop.hbase.hbql.client.HConnection;
41 import org.apache.hadoop.hbase.hbql.client.HMapping;
42 import org.apache.hadoop.hbase.hbql.client.HPreparedStatement;
43 import org.apache.hadoop.hbase.hbql.client.HRecord;
44 import org.apache.hadoop.hbase.hbql.client.HResultSet;
45 import org.apache.hadoop.hbase.hbql.client.HStatement;
46 import org.apache.hadoop.hbase.hbql.client.QueryExecutorPool;
47 import org.apache.hadoop.hbase.hbql.client.QueryExecutorPoolManager;
48 import org.apache.hadoop.hbase.hbql.mapping.AnnotationResultAccessor;
49 import org.apache.hadoop.hbase.hbql.mapping.FamilyMapping;
50 import org.apache.hadoop.hbase.hbql.mapping.TableMapping;
51 import org.apache.hadoop.hbase.hbql.statement.args.KeyInfo;
52 import org.apache.hadoop.hbase.hbql.statement.args.WithArgs;
53 import org.apache.hadoop.hbase.hbql.util.AtomicReferences;
54 import org.apache.hadoop.hbase.hbql.util.Maps;
55 import org.apache.hadoop.hbase.hbql.util.PoolableElement;
56 import org.apache.hadoop.hbase.hbql.util.Sets;
57 import org.apache.hadoop.hbase.util.Bytes;
58
59 import java.io.IOException;
60 import java.util.List;
61 import java.util.Map;
62 import java.util.Set;
63 import java.util.concurrent.atomic.AtomicBoolean;
64 import java.util.concurrent.atomic.AtomicReference;
65
66 public class HConnectionImpl extends PoolableElement<HConnectionImpl> implements HConnection {
67
68 public static final String MAXTABLEREFS = "maxtablerefs";
69 public static final String MASTER = "hbase.master";
70
71 private final AtomicBoolean atomicClosed = new AtomicBoolean(false);
72 private final Configuration configuration;
73 private final HTablePool tablePool;
74 private final int maxTablePoolReferencesPerTable;
75 private final MappingManager mappingManager;
76
77 private final AtomicReference<Map<Class, AnnotationResultAccessor>> atomicAnnoMapping = AtomicReferences.newAtomicReference();
78 private final AtomicReference<HBaseAdmin> atomicHbaseAdmin = AtomicReferences.newAtomicReference();
79 private final AtomicReference<IndexedTableAdmin> atomicIndexTableAdmin = AtomicReferences.newAtomicReference();
80
81 private String queryExecutorPoolName = null;
82 private String asyncExecutorName = null;
83
84 public HConnectionImpl(final Configuration configuration,
85 final HConnectionPoolImpl connectionPool,
86 final int maxTablePoolReferencesPerTable) throws HBqlException {
87 super(connectionPool);
88 this.configuration = (configuration == null) ? HBaseConfiguration.create() : configuration;
89 this.maxTablePoolReferencesPerTable = maxTablePoolReferencesPerTable;
90 this.tablePool = new HTablePool(this.getConfiguration(), this.getMaxTablePoolReferencesPerTable());
91 this.mappingManager = new MappingManager(this);
92
93 this.getMappingManager().validatePersistentMetadata();
94 }
95
96 public static Configuration getConfiguration(final String master) {
97 final Configuration configuration = new Configuration();
98 configuration.set(HConnectionImpl.MASTER, master);
99 return HBaseConfiguration.create(configuration);
100 }
101
102 private AtomicReference<Map<Class, AnnotationResultAccessor>> getAtomicAnnoMapping() {
103 return this.atomicAnnoMapping;
104 }
105
106 public void resetElement() {
107
108 try {
109 this.getAtomicClosed().set(false);
110 this.getMappingManager().clear();
111 }
112 catch (HBqlException e) {
113 e.printStackTrace();
114 }
115
116 if (this.getAtomicAnnoMapping().get() != null)
117 this.getAtomicAnnoMapping().get().clear();
118
119 this.setQueryExecutorPoolName(null);
120 }
121
122 public boolean isPooled() {
123 return this.getElementPool() != null;
124 }
125
126 public Configuration getConfiguration() {
127 return this.configuration;
128 }
129
130 private HTablePool getTablePool() {
131 return this.tablePool;
132 }
133
134 private MappingManager getMappingManager() throws HBqlException {
135 this.checkIfClosed();
136 return this.mappingManager;
137 }
138
139 public int getMaxTablePoolReferencesPerTable() {
140 return this.maxTablePoolReferencesPerTable;
141 }
142
143 private Map<Class, AnnotationResultAccessor> getAnnotationMappingMap() {
144 if (this.getAtomicAnnoMapping().get() == null) {
145 synchronized (this) {
146 if (this.getAtomicAnnoMapping().get() == null) {
147 final Map<Class, AnnotationResultAccessor> newmap = Maps.newConcurrentHashMap();
148 this.getAtomicAnnoMapping().set(newmap);
149 }
150 }
151 }
152 return this.getAtomicAnnoMapping().get();
153 }
154
155 public AnnotationResultAccessor getAnnotationMapping(final Object obj) throws HBqlException {
156 return this.getAnnotationMapping(obj.getClass());
157 }
158
159 public synchronized AnnotationResultAccessor getAnnotationMapping(final Class<?> clazz) throws HBqlException {
160
161 AnnotationResultAccessor accessor = getAnnotationMappingMap().get(clazz);
162
163 if (accessor != null) {
164 return accessor;
165 }
166 else {
167 accessor = AnnotationResultAccessor.newAnnotationMapping(this, clazz);
168 getAnnotationMappingMap().put(clazz, accessor);
169 return accessor;
170 }
171 }
172
173 private AtomicReference<HBaseAdmin> getAtomicHbaseAdmin() {
174 return this.atomicHbaseAdmin;
175 }
176
177 public HBaseAdmin getHBaseAdmin() throws HBqlException {
178 this.checkIfClosed();
179 if (this.getAtomicHbaseAdmin().get() == null) {
180 synchronized (this) {
181 if (this.getAtomicHbaseAdmin().get() == null) {
182 try {
183 this.getAtomicHbaseAdmin().set(new HBaseAdmin(this.getConfiguration()));
184 }
185 catch (MasterNotRunningException e) {
186 throw new HBqlException(e);
187 }
188 catch (ZooKeeperConnectionException e) {
189 throw new HBqlException(e);
190 }
191 }
192 }
193 }
194 return this.getAtomicHbaseAdmin().get();
195 }
196
197 private AtomicReference<IndexedTableAdmin> getAtomicIndexTableAdmin() {
198 return this.atomicIndexTableAdmin;
199 }
200
201 public IndexedTableAdmin getIndexTableAdmin() throws HBqlException {
202 this.checkIfClosed();
203 if (this.getAtomicIndexTableAdmin().get() == null) {
204 synchronized (this) {
205 if (this.getAtomicIndexTableAdmin().get() == null) {
206 try {
207 this.getAtomicIndexTableAdmin().set(new IndexedTableAdmin(this.getConfiguration()));
208 }
209 catch (MasterNotRunningException e) {
210 throw new HBqlException(e);
211 }
212 catch (ZooKeeperConnectionException e) {
213 throw new HBqlException(e);
214 }
215 }
216 }
217 }
218 return this.getAtomicIndexTableAdmin().get();
219 }
220
221 public Set<String> getFamilyNames(final String tableName) throws HBqlException {
222 this.checkIfClosed();
223 final HTableDescriptor table = this.getHTableDescriptor(tableName);
224 final Set<String> familySet = Sets.newHashSet();
225 for (final HColumnDescriptor descriptor : table.getColumnFamilies())
226 familySet.add(Bytes.toString(descriptor.getName()));
227 return familySet;
228 }
229
230 public boolean familyExistsForTable(final String familyName, final String tableName) throws HBqlException {
231 final Set<String> names = this.getFamilyNames(tableName);
232 return names.contains(familyName);
233 }
234
235 public boolean familyExistsForMapping(final String familyName, final String mappingName) throws HBqlException {
236 final TableMapping mapping = this.getMapping(mappingName);
237 return mapping.containsFamily(familyName);
238 }
239
240 public IndexedTableDescriptor newIndexedTableDescriptor(final String tableName) throws HBqlException {
241 this.checkIfClosed();
242 try {
243 final HTableDescriptor tableDesc = this.getHTableDescriptor(tableName);
244 return new IndexedTableDescriptor(tableDesc);
245 }
246 catch (IOException e) {
247 throw new HBqlException(e);
248 }
249 }
250
251 public boolean indexExistsForMapping(final String indexName, final String mappingName) throws HBqlException {
252 final TableMapping mapping = this.getMapping(mappingName);
253 return this.indexExistsForTable(indexName, mapping.getTableName());
254 }
255
256 public IndexSpecification getIndexForTable(final String indexName, final String tableName) throws HBqlException {
257 final IndexedTableDescriptor indexDesc = this.newIndexedTableDescriptor(tableName);
258 return indexDesc.getIndex(indexName);
259 }
260
261 public boolean indexExistsForTable(final String indexName, final String tableName) throws HBqlException {
262 this.checkIfClosed();
263 final IndexSpecification index = this.getIndexForTable(indexName, tableName);
264 return index != null;
265 }
266
267 public void dropIndexForMapping(final String indexName, final String mappingName) throws HBqlException {
268 final TableMapping mapping = this.getMapping(mappingName);
269 this.dropIndexForTable(mapping.getTableName(), indexName);
270 }
271
272 public void dropIndexForTable(final String tableName, final String indexName) throws HBqlException {
273 this.validateIndexExistsForTable(indexName, tableName);
274 try {
275 final IndexedTableAdmin ita = this.getIndexTableAdmin();
276 ita.removeIndex(tableName.getBytes(), indexName);
277 }
278 catch (IOException e) {
279 throw new HBqlException(e);
280 }
281 }
282
283 public HStatement createStatement() throws HBqlException {
284 this.checkIfClosed();
285 return new HStatementImpl(this);
286 }
287
288 public HPreparedStatement prepareStatement(final String sql) throws HBqlException {
289 this.checkIfClosed();
290 return new HPreparedStatementImpl(this, sql);
291 }
292
293 public void releaseElement() {
294 this.getElementPool().release(this);
295 }
296
297 private AtomicBoolean getAtomicClosed() {
298 return this.atomicClosed;
299 }
300
301 public boolean isClosed() {
302 return this.getAtomicClosed().get();
303 }
304
305 public synchronized void close() {
306 if (!this.isClosed()) {
307 this.getAtomicClosed().set(true);
308
309 if (this.isPooled())
310 this.releaseElement();
311 }
312 }
313
314 private void checkIfClosed() throws HBqlException {
315 if (this.isClosed())
316 throw new HBqlException("Connection is closed");
317 }
318
319 public ExecutionResults execute(final String sql) throws HBqlException {
320 final HStatement stmt = this.createStatement();
321 return stmt.execute(sql);
322 }
323
324 public HResultSet<HRecord> executeQuery(final String sql) throws HBqlException {
325 final HStatement stmt = this.createStatement();
326 return stmt.executeQuery(sql);
327 }
328
329 public <T> HResultSet<T> executeQuery(final String sql, final Class clazz) throws HBqlException {
330 final HStatement stmt = this.createStatement();
331 return stmt.executeQuery(sql, clazz);
332 }
333
334 public List<HRecord> executeQueryAndFetch(final String sql) throws HBqlException {
335 final HStatement stmt = this.createStatement();
336 return stmt.executeQueryAndFetch(sql);
337 }
338
339 public <T> List<T> executeQueryAndFetch(final String sql, final Class clazz) throws HBqlException {
340 final HStatement stmt = this.createStatement();
341 return stmt.executeQueryAndFetch(sql, clazz);
342 }
343
344 public ExecutionResults executeUpdate(final String sql) throws HBqlException {
345 final HStatement stmt = this.createStatement();
346 return stmt.executeUpdate(sql);
347 }
348
349
350 public boolean mappingExists(final String mappingName) throws HBqlException {
351 return this.getMappingManager().mappingExists(mappingName);
352 }
353
354 public TableMapping getMapping(final String mappingName) throws HBqlException {
355 return this.getMappingManager().getMapping(mappingName);
356 }
357
358 public boolean dropMapping(final String mappingName) throws HBqlException {
359 return this.getMappingManager().dropMapping(mappingName);
360 }
361
362 public Set<HMapping> getAllMappings() throws HBqlException {
363 return this.getMappingManager().getAllMappings();
364 }
365
366 public TableMapping createMapping(final boolean tempMapping,
367 final boolean systemMapping,
368 final String mappingName,
369 final String tableName,
370 final KeyInfo keyInfo,
371 final List<FamilyMapping> familyList) throws HBqlException {
372 return this.getMappingManager().createMapping(tempMapping,
373 systemMapping,
374 mappingName,
375 tableName,
376 keyInfo,
377 familyList);
378 }
379
380
381 public void createTable(final HTableDescriptor tableDesc) throws HBqlException {
382 this.checkIfClosed();
383 final String tableName = tableDesc.getNameAsString();
384 if (this.tableExists(tableName))
385 throw new HBqlException("Table already exists: " + tableName);
386
387 try {
388 this.getHBaseAdmin().createTable(tableDesc);
389 }
390 catch (IOException e) {
391 throw new HBqlException(e);
392 }
393 }
394
395 public HTableWrapper newHTableWrapper(final WithArgs withArgs, final String tableName) throws HBqlException {
396
397 this.checkIfClosed();
398
399 try {
400 if (withArgs != null && withArgs.hasAnIndex())
401 return new HTableWrapper(new IndexedTable(this.getConfiguration(), tableName.getBytes()), null);
402 else
403 return new HTableWrapper(this.getTablePool().getTable(tableName), this.getTablePool());
404 }
405 catch (IOException e) {
406 throw new HBqlException(e);
407 }
408 catch (RuntimeException e) {
409 throw new HBqlException("Invalid table name: " + tableName);
410 }
411 }
412
413 public boolean tableExists(final String tableName) throws HBqlException {
414 try {
415 return this.getHBaseAdmin().tableExists(tableName);
416 }
417 catch (MasterNotRunningException e) {
418 throw new HBqlException(e);
419 }
420 catch (IOException e) {
421 throw new HBqlException(e);
422 }
423 }
424
425 public HTableDescriptor getHTableDescriptor(final String tableName) throws HBqlException {
426 this.validateTableName(tableName);
427 try {
428 return this.getHBaseAdmin().getTableDescriptor(tableName.getBytes());
429 }
430 catch (IOException e) {
431 throw new HBqlException(e);
432 }
433 }
434
435 public boolean tableAvailable(final String tableName) throws HBqlException {
436 this.validateTableName(tableName);
437 try {
438 return this.getHBaseAdmin().isTableAvailable(tableName);
439 }
440 catch (IOException e) {
441 throw new HBqlException(e);
442 }
443 }
444
445 public boolean tableEnabled(final String tableName) throws HBqlException {
446 this.validateTableName(tableName);
447 try {
448 return this.getHBaseAdmin().isTableEnabled(tableName);
449 }
450 catch (IOException e) {
451 throw new HBqlException(e);
452 }
453 }
454
455 public void dropTable(final String tableName) throws HBqlException {
456 validateTableDisabled(tableName, "drop");
457 try {
458 final byte[] tableNameBytes = tableName.getBytes();
459 this.getHBaseAdmin().deleteTable(tableNameBytes);
460 }
461 catch (IOException e) {
462 throw new HBqlException(e);
463 }
464 }
465
466 public void disableTable(final String tableName) throws HBqlException {
467 try {
468 if (!this.tableEnabled(tableName))
469 throw new HBqlException("Cannot disable disabled table: " + tableName);
470 this.getHBaseAdmin().disableTable(tableName);
471 }
472 catch (IOException e) {
473 throw new HBqlException(e);
474 }
475 }
476
477 public void enableTable(final String tableName) throws HBqlException {
478 validateTableDisabled(tableName, "enable");
479 try {
480 this.getHBaseAdmin().enableTable(tableName);
481 }
482 catch (IOException e) {
483 throw new HBqlException(e);
484 }
485 }
486
487 public Set<String> getTableNames() throws HBqlException {
488 try {
489 final HBaseAdmin admin = this.getHBaseAdmin();
490 final Set<String> tableSet = Sets.newHashSet();
491 for (final HTableDescriptor table : admin.listTables())
492 tableSet.add(table.getNameAsString());
493 return tableSet;
494 }
495 catch (IOException e) {
496 throw new HBqlException(e);
497 }
498 }
499
500
501 public CompletionQueueExecutor getQueryExecutorForConnection() throws HBqlException {
502
503 if (!Utils.isValidString(this.getQueryExecutorPoolName()))
504 throw new HBqlException("Connection not assigned a QueryExecutorPool name");
505
506 this.validateQueryExecutorPoolNameExists(this.getQueryExecutorPoolName());
507
508 final QueryExecutorPool pool = QueryExecutorPoolManager.getQueryExecutorPool(this.getQueryExecutorPoolName());
509 final CompletionQueueExecutor executorQueue = ((QueryExecutorPoolImpl)pool).take();
510
511
512 executorQueue.resetElement();
513 return executorQueue;
514 }
515
516
517 public AsyncExecutorImpl getAsyncExecutorForConnection() throws HBqlException {
518
519 if (!Utils.isValidString(this.getAsyncExecutorName()))
520 throw new HBqlException("Connection not assigned an AsyncExecutor name");
521
522 this.validateAsyncExecutorNameExists(this.getAsyncExecutorName());
523
524 final AsyncExecutor executor = AsyncExecutorManager.getAsyncExecutor(this.getAsyncExecutorName());
525
526 return ((AsyncExecutorImpl)executor);
527 }
528
529
530 public boolean usesQueryExecutor() {
531 return Utils.isValidString(this.getQueryExecutorPoolName());
532 }
533
534 public String getQueryExecutorPoolName() {
535 return this.queryExecutorPoolName;
536 }
537
538 public void setQueryExecutorPoolName(final String poolName) {
539 this.queryExecutorPoolName = poolName;
540 }
541
542 public String getAsyncExecutorName() {
543 return this.asyncExecutorName;
544 }
545
546 public void setAsyncExecutorName(final String poolName) {
547 this.asyncExecutorName = poolName;
548 }
549
550 public <T> HBatch<T> newHBatch() {
551 return new HBatch<T>(this);
552 }
553
554 public void validateTableName(final String tableName) throws HBqlException {
555 if (!this.tableExists(tableName))
556 throw new HBqlException("Table not found: " + tableName);
557 }
558
559 public void validateTableDisabled(final String tableName, final String action) throws HBqlException {
560 if (this.tableEnabled(tableName))
561 throw new HBqlException("Cannot " + action + " enabled table: " + tableName);
562 }
563
564 public void validateFamilyExistsForTable(final String familyName, final String tableName) throws HBqlException {
565 if (!this.familyExistsForTable(familyName, tableName))
566 throw new HBqlException("Family " + familyName + " not defined for table " + tableName);
567 }
568
569 public void validateIndexExistsForTable(final String indexName, final String tableName) throws HBqlException {
570 if (!this.indexExistsForTable(indexName, tableName))
571 throw new HBqlException("Index " + indexName + " not defined for table " + tableName);
572 }
573
574 public void validateQueryExecutorPoolNameExists(final String poolName) throws HBqlException {
575 if (!QueryExecutorPoolManager.queryExecutorPoolExists(poolName))
576 throw new HBqlException("QueryExecutorPool " + poolName + " does not exist.");
577 }
578
579 public void validateAsyncExecutorNameExists(final String name) throws HBqlException {
580 if (!AsyncExecutorManager.asyncExecutorExists(name))
581 throw new HBqlException("AsyncExecutor " + name + " does not exist.");
582 }
583 }