View Javadoc

1   /*
2    * Copyright (c) 2010.  The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.hbql.client;
22  
23  import org.apache.hadoop.hbase.client.Delete;
24  import org.apache.hadoop.hbase.client.Put;
25  import org.apache.hadoop.hbase.hbql.impl.BatchAction;
26  import org.apache.hadoop.hbase.hbql.impl.DeleteAction;
27  import org.apache.hadoop.hbase.hbql.impl.HConnectionImpl;
28  import org.apache.hadoop.hbase.hbql.impl.HRecordImpl;
29  import org.apache.hadoop.hbase.hbql.impl.HTableWrapper;
30  import org.apache.hadoop.hbase.hbql.impl.InsertAction;
31  import org.apache.hadoop.hbase.hbql.mapping.AnnotationResultAccessor;
32  import org.apache.hadoop.hbase.hbql.mapping.ColumnAttrib;
33  import org.apache.hadoop.hbase.hbql.mapping.ResultAccessor;
34  import org.apache.hadoop.hbase.hbql.mapping.TableMapping;
35  import org.apache.hadoop.hbase.hbql.util.Lists;
36  import org.apache.hadoop.hbase.hbql.util.Maps;
37  
38  import java.io.IOException;
39  import java.util.List;
40  import java.util.Map;
41  
42  public class HBatch<T> {
43  
44      private final HConnection connection;
45  
46      private final Map<String, List<BatchAction>> actionMap = Maps.newConcurrentHashMap();
47  
48      public HBatch(final HConnection connection) {
49          this.connection = connection;
50      }
51  
52      public HConnection getConnection() {
53          return this.connection;
54      }
55  
56      private HConnectionImpl getHConnectionImpl() {
57          return (HConnectionImpl)this.getConnection();
58      }
59  
60      public Map<String, List<BatchAction>> getActionMap() {
61          return this.actionMap;
62      }
63  
64      public synchronized List<BatchAction> getActionList(final String tableName) {
65          List<BatchAction> retval = this.getActionMap().get(tableName);
66          if (retval == null) {
67              retval = Lists.newArrayList();
68              this.getActionMap().put(tableName, retval);
69          }
70          return retval;
71      }
72  
73      public void insert(final T newrec) throws HBqlException {
74  
75          if (newrec instanceof HRecordImpl) {
76              final HRecordImpl record = (HRecordImpl)newrec;
77              final TableMapping tableMapping = record.getTableMapping();
78              final ColumnAttrib keyAttrib = tableMapping.getKeyAttrib();
79              if (!record.isCurrentValueSet(keyAttrib))
80                  throw new HBqlException("Record key value must be assigned");
81  
82              final Put put = this.createPut(record.getResultAccessor(), record);
83              this.getActionList(tableMapping.getTableName()).add(new InsertAction(put));
84          }
85          else {
86              final AnnotationResultAccessor accessor = this.getHConnectionImpl().getAnnotationMapping(newrec);
87              final Put put = this.createPut(accessor, newrec);
88              this.getActionList(accessor.getMapping().getTableName()).add(new InsertAction(put));
89          }
90      }
91  
92      public void delete(final T newrec) throws HBqlException {
93  
94          if (newrec instanceof HRecordImpl) {
95              final HRecordImpl record = (HRecordImpl)newrec;
96              final TableMapping tableMapping = record.getTableMapping();
97              final ColumnAttrib keyAttrib = tableMapping.getKeyAttrib();
98              if (!record.isCurrentValueSet(keyAttrib))
99                  throw new HBqlException("Record key value must be assigned");
100             this.delete(tableMapping, record);
101         }
102         else {
103             final AnnotationResultAccessor accessor = this.getHConnectionImpl().getAnnotationMapping(newrec);
104             this.delete(accessor.getTableMapping(), newrec);
105         }
106     }
107 
108     private void delete(TableMapping tableMapping, final Object newrec) throws HBqlException {
109         final ColumnAttrib keyAttrib = tableMapping.getKeyAttrib();
110         final byte[] keyval = keyAttrib.getValueAsBytes(newrec);
111         this.getActionList(tableMapping.getTableName()).add(new DeleteAction(new Delete(keyval)));
112     }
113 
114     private Put createPut(final ResultAccessor resultAccessor, final Object newrec) throws HBqlException {
115 
116         final TableMapping tableMapping = resultAccessor.getTableMapping();
117         final ColumnAttrib keyAttrib = resultAccessor.getKeyAttrib();
118 
119         final Put put;
120 
121         if (newrec instanceof HRecordImpl) {
122             final HRecordImpl record = (HRecordImpl)newrec;
123             final byte[] keyval = keyAttrib.getValueAsBytes(record);
124             put = new Put(keyval);
125 
126             for (final String family : tableMapping.getFamilySet()) {
127                 for (final ColumnAttrib attrib : tableMapping.getColumnAttribListByFamilyName(family)) {
128                     if (record.isCurrentValueSet(attrib)) {
129                         final byte[] b = attrib.getValueAsBytes(record);
130                         put.add(attrib.getFamilyNameAsBytes(), attrib.getColumnNameAsBytes(), b);
131                     }
132                 }
133             }
134         }
135         else {
136             final byte[] keyval = keyAttrib.getValueAsBytes(newrec);
137             put = new Put(keyval);
138             for (final String family : tableMapping.getFamilySet()) {
139                 for (final ColumnAttrib colattrib : tableMapping.getColumnAttribListByFamilyName(family)) {
140 
141                     // One extra lookup for annotations
142                     final ColumnAttrib attrib = resultAccessor.getColumnAttribByName(colattrib.getFamilyQualifiedName());
143                     final byte[] b = attrib.getValueAsBytes(newrec);
144                     put.add(attrib.getFamilyNameAsBytes(), attrib.getColumnNameAsBytes(), b);
145                 }
146             }
147         }
148         return put;
149     }
150 
151     public void apply() throws HBqlException {
152         try {
153             for (final String tableName : this.getActionMap().keySet()) {
154                 HTableWrapper tableref = null;
155                 try {
156                     tableref = this.getHConnectionImpl().newHTableWrapper(null, tableName);
157                     for (final BatchAction batchAction : this.getActionList(tableName))
158                         batchAction.apply(tableref.getHTable());
159                     tableref.getHTable().flushCommits();
160                     tableref.getHTable().close();
161                 }
162                 finally {
163                     // Release to table pool
164                     if (tableref != null)
165                         tableref.releaseHTable();
166                 }
167             }
168         }
169         catch (IOException e) {
170             throw new HBqlException(e);
171         }
172     }
173 }