数据库专题训练 Lab 4


Lab 4 查询处理

基础功能

Limit 算子

/* executors/limit_executor.h */
class LimitExecutor : public Executor {
 private:
  uint32_t limit_;
  uint32_t offset_;
};
/* executors/limit_executor.cpp */
std::shared_ptr<Record> LimitExecutor::Next() {
  // 通过 plan_ 获取 limit 语句中的 offset 和 limit 值
  auto offset = plan_->limit_offset_.value_or(0);
  auto limit = plan_->limit_count_.value_or(UINT32_MAX);
  while (auto record = children_[0]->Next()) {
    // 设置开始输出的记录位置
    if (offset_++ < offset) {
      continue;
    }
    // 限制输出的记录数
    if (limit_++ < limit) {
      return record;
    }
    break;
  }
  return nullptr;
}

排序算子

/* executors/orderby_executor.h */
struct order_pair {
  std::vector<std::pair<Value, OrderByType>> values_;
  std::shared_ptr<Record> record_;
};

bool compare(const order_pair &a, const order_pair &b);

class OrderByExecutor : public Executor {
 private:
  std::vector<order_pair> records_;
  bool sorted_ = false;
};
/* executors/orderby_executor.cpp */
bool compare(const order_pair &a, const order_pair &b) {
  // 根据 OrderByType 进行排序,传入 std::sort 函数
  for (size_t i = 0; i < a.values_.size(); i++) {
    // 通过 Value 的 Less, Equal, Greater 函数比较 Value 的值
    if (a.values_[i].first.Equal(b.values_[i].first)) {
      continue;
    }
    if (a.values_[i].second == OrderByType::DESC) {
      return a.values_[i].first.Less(b.values_[i].first);
    }
    else {
      return a.values_[i].first.Greater(b.values_[i].first);
    }
  }
  return false;
}

std::shared_ptr<Record> OrderByExecutor::Next() {
  // 排序阶段
  if (!sorted_) {
    auto records = std::vector<order_pair>();
    while (auto record = children_[0]->Next()) {
      std::vector<std::pair<Value, OrderByType>> values;
      for (auto &order_by: plan_->order_bys_) {
        // 通过 OperatorExpression 的 Evaluate 函数获取 Value 的值
        auto value = order_by.second->Evaluate(record);
        auto type = order_by.first;
        values.push_back(std::make_pair(value, type));
      }
      records.push_back(order_pair{values, record});
    }
    // 使用 STL 的 sort 函数
    std::sort(records.begin(), records.end(), compare);
    records_ = std::move(records);
    sorted_ = true;
  }
  // 输出阶段
  if (records_.size() > 0) {
    auto record = records_.back().record_;
    records_.pop_back();
    return record;
  }
  return nullptr;
}

嵌套循环连接算子

/* executors/nested_loop_join_executor.h */
class NestedLoopJoinExecutor : public Executor {
 private:
  std::vector<std::shared_ptr<Record>> first_records_;
  std::vector<std::shared_ptr<Record>> second_records_;
  std::vector<std::shared_ptr<Record>> records_;
  bool joined_ = false;
};
/* executors/nested_loop_join_executor.cpp */
std::shared_ptr<Record> NestedLoopJoinExecutor::Next() {
  // 连接阶段
  if (!joined_) {
    while (auto record = children_[0]->Next()) {
      first_records_.push_back(record);
    }
    while (auto record = children_[1]->Next()) {
      second_records_.push_back(record);
    }
    // 二重嵌套循环连接
    for (auto &first_record : first_records_) {
      for (auto &second_record : second_records_) {
        // 从 NestedLoopJoinOperator 中获取连接条件
        if (plan_->join_type_ == JoinType::INNER) {
          // 使用 EvaluateJoin 函数判断是否满足 join 条件
          auto value = plan_->join_condition_->EvaluateJoin(first_record, second_record).GetValue<bool>();
          if (value) {
            // 使用 Record 的 Append 函数进行记录的连接
            auto record = std::make_shared<Record>();
            record->Append(*first_record);
            record->Append(*second_record);
            // 将连接后的记录加入到 records_ 中
            records_.push_back(record);
          }
        }
      }
    }
    joined_ = true;
  }
  // 输出阶段
  if (records_.size() > 0) {
    auto record = records_.back();
    records_.pop_back();
    return record;
  }
  return nullptr;
}

归并连接算子

/* executors/merge_join_executor.h */
class MergeJoinExecutor : public Executor {
 private:
  std::list<std::shared_ptr<Record>> first_records_;
  std::list<std::shared_ptr<Record>> second_records_;
  std::list<std::shared_ptr<Record>> records_;
  bool joined_ = false;
};
/* executors/merge_join_executor.cpp */
std::shared_ptr<Record> MergeJoinExecutor::Next() {
  // 连接阶段
  if (!joined_) {
    while (auto record = children_[0]->Next()) {
      first_records_.push_back(record);
    }
    while (auto record = children_[1]->Next()) {
      second_records_.push_back(record);
    }
    if (plan_->join_type_ == JoinType::INNER) {
      auto first_record = first_records_.front();
      auto second_record = second_records_.front();
      while (first_records_.size() > 0 && second_records_.size() > 0) {
        // 不满足 join 条件,跳过相应记录
        if (plan_->left_key_->Evaluate(first_record).Less(plan_->right_key_->Evaluate(second_record))) {
          first_records_.pop_front();
          first_record = first_records_.front();
          continue;
        }
        // 不满足 join 条件,跳过相应记录
        if (plan_->left_key_->Evaluate(first_record).Greater(plan_->right_key_->Evaluate(second_record))) {
          second_records_.pop_front();
          second_record = second_records_.front();
          continue;
        }
        // 满足 join 条件
        while (plan_->left_key_->Evaluate(first_record).Equal(plan_->right_key_->Evaluate(second_record))) {
          auto second_record_iter = second_records_.begin();
          // 查询重复元组
          while (plan_->left_key_->Evaluate(first_record).Equal(plan_->right_key_->Evaluate(*second_record_iter))) {
            // 使用 Record 的 Append 函数进行记录的连接
            auto record = std::make_shared<Record>();
            record->Append(*first_record);
            record->Append(**second_record_iter);
            // 将连接后的记录加入到 records_ 中
            records_.push_back(record);
            // 遍历 second_records_ 直到为空
            if (++second_record_iter == second_records_.end()) {
              break;
            }
          }
          // 遍历 first_records_ 直到为空
          first_records_.pop_front();
          if (first_records_.size() == 0) {
            break;
          }
          first_record = first_records_.front();
        }
      }
    }
    joined_ = true;
  }
  // 输出阶段
  if (records_.size() > 0) {
    auto record = records_.back();
    records_.pop_back();
    return record;
  }
  return nullptr;
}

高级功能

外连接

设计思路

使用 bool 数组标记循环中外表与内表中的每条记录的连接情况. 循环完毕后, 从 NestedLoopJoinOperator 中获取连接条件, 对于外连接情形分别使用 null 补全未被连接的记录.

新增代码

文件 src/executors/nested_loop_join_executor.cpp.

测例展示

测试用例位于 test/lab4/50-nested-loop-outer-join.test, 设置上完全复用了 test/lab4/30-nested-loop-join.test, 只修改连接条件为 left join / right join / full join.

> make debug && make lab4/50

内存哈希连接

设计思路

src/executors/hash_join_executor.h 中添加成员变量:

class HashJoinExecutor : public Executor {
 private:
  std::shared_ptr<const HashJoinOperator> plan_;
  std::hash<Value> hasher;
  std::unordered_map<size_t, std::vector<std::shared_ptr<Record>>> first_records_;
  std::unordered_map<size_t, std::vector<std::shared_ptr<Record>>> second_records_;
  std::vector<std::shared_ptr<Record>> records_;
  bool joined_ = false;
};

划分阶段: 按 hasher 的哈希值对两表的记录进行分组, 哈希值相同的记录被划分到 unordered_map 的一个桶里进行连接.

探查阶段: 对记录进行嵌套循环连接, 并输出到 records_ 中.

新增代码

文件 src/executors/hash_join_executor.cpp.

测例展示

测试用例位于 test/lab4/60-hash-loop-join.test, 完全复用了 test/lab4/30-nested-loop-join.test, 只修改连接设置为 hash.

> make debug && make lab4/60

分组聚合算子

设计思路

首先对记录进行哈希分组:

/* src/executors/aggregate_executor.cpp */
while (auto record = children_[0]->Next()) {
  uint64_t hash = 0;
  for (auto &group_by : plan_->group_bys_) {
    hash *= 31;
    hash += hasher(group_by->Evaluate(record));
    auto col_value = dynamic_cast<ColumnValue *>(group_by.get());
  }
  hash %= 100000;
  hash_records_[hash].push_back(record);
}

在分组后遍历组内的每条记录以实现 min, max, sum, avg 聚合函数, 依据记录字段是否为空实现 count 聚合函数, count(*) 聚合函数直接返回记录总数.

新增代码

文件 src/executors/aggregate_executor.cpp.

测例展示

测试用例位于 test/lab4/70-aggregate.test.

> make debug && make lab4/70

分别进行了如下测试:

select id, room, max(score) from test_group_by group by id, room
select id, room, sum(score) from test_group_by group by id, room
select id, room, avg(score) from test_group_by group by id, room
select id, room, count(score) from test_group_by group by id, room
------------------------------------------------------------------
select count(id) from test_count
select count(distinct id) from test_count
select count(*) from test_count

实验耗时

  1. 基础功能
    • Limit 算子: 0.5h.
    • 排序算子: 0.5h.
    • 嵌套循环连接算子: 0.5h.
    • 归并连接算子: 1h.
  2. 高级功能
    • 外连接: 2.5h.
    • 内存哈希连接与分组聚合算子: 4h.

文章作者: Chengsx
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Chengsx !
  目录