What is Skewness?

Skewness happens when there is an unbalanced presence of a specific value within a column when compared to the rest of the values that are part of the domain of that column.

In other words, there are values that are repeating a lot within our dataset.

Why is this a problem?

The problem with skewness, specially in distributed systems, is that it might cause an unbalanced load across the nodes executing queries that involve the skewed value, this often occurs while the column is a join key or when it is part of the group by in aggretations. In such cases what happens is that a small subset of machines will have to process all the rows for that specific value and, when dealing with large datasets, that will take a long time.

Let's consider the case of a JOIN, imagine we have two tables content_engaged and content_detail that has information about users interacting with specific types of content in a social network kind of product. Each time a user views, likes or comments on a specific content we will register an entry in the content_engaged table.

SELECT
    A.content_id,
    A.content_type,
    B.content_text
FROM content_engaged A
JOIN content_detail B 
    ON (A.content_id = B.content_id)

When we submit this query to a distributed query engine, and depending on the type of join being performed (specifically not a broadcast join), the query engine will distribute the rows from each table into different executors by taking the key(s) that are part of the join clause and applying a modulus operation on the hash of the value of the column, so if we have 100 executor machines to do the join, we will have something like hash(A.content_id)%100 for content_engaged and hash(B.content_id)%100 for content_detail, this ensures that rows with the same value for content_id coming from both tables end up in the same machine, so the join is possible.

Distributing values across executors during a join of 2 tables

In our image example, executor E1 will receive all the rows where content_id = 1 that come from table A as well as all the rows where content_id = 1 from table B. The same is true for E2 and E3: they will get rows for different values of content_id from each table.

Now, ask yourself: What will happen if we have a very popular piece of content, lets say content_id = 1, that millions of users engaged with in a given day? in that case, all the rows from the popular content will go to a specific executor which will have to process them one by one taking more time to complete the task when compared to the other executors that got less rows to process.

Detecting Skewness

Now that we know about the theory and how skewness happens, let's see an example running a query against actual datasets in Spark.

First, let's create both tables

CREATE TABLE content_engaged
(
  content_id BIGINT,
  user_id BIGINT,
  interaction VARCHAR,
  interaction_ts TIMESTAMP,
  ds INTEGER
)
WITH
(
    FORMAT = 'PARQUET',
    PARTITIONING = ARRAY['ds']
);

and

CREATE TABLE content_details
(
  content_id BIGINT,
  type VARCHAR,
  owner_id BIGINT,
  created_ts TIMESTAMP
)
WITH
(
    FORMAT = 'PARQUET',
    PARTITIONING = ARRAY['type']
);

Sparks offers a nice way to check the execution stats of each task, in particular this table tell us the distribution on certain metrics in a couple of percentiles


Published

Category

optimization

Tags