This is a very difficult thing to do. Very impressive. I have so many questions but my number one is: were you able to evaluate alternatives to your existing vertical scaling based setup? For example, cockroachdb, multi-master postgres, using sharding instead of a single DB, etc. At that database size, you are well past the point in which a more advanced DB technology would theoretically help you scale and simplify your architecture, so I'm curious why you didn't go that route.
(as a downvoter) Distributed transactions don't scale, they are NOT efficient. You can't co-partition data in Cockroachdb, so the only way is the slow way. Interleaved-data don't count.
Are you suggesting it hits scaling boundaries earlier than that single-node postgres? The TPC-C benchmark suggests fairly strong scaling up to medium~high double digit node counts, and my understanding is that it's decently conflict prone and very much relying on distributed transactions.
Of course a distributed architecture costs efficiency, but as long as it still scales further (and after some point, cheaper) than single-node alternatives, the efficiency loss is tolerable.
You can affect partitioning, but forcing it requires the payed version.
> Are you suggesting it hits scaling boundaries earlier than that single-node postgres?
This is nosql scenario all over again. You have to think in cost($)/query and your data layout.
> Of course a distributed architecture costs efficiency, but as long as it still scales further (and after some point, cheaper) than single-node alternatives, the efficiency loss is tolerable.
The efficiency loss is tolerable. The question is, when ? For primary-key get, efficiency-loss is, let's say, none. So you can start distributed from the start.
For another query, especially multi-tenant when you can put a tenant in 1 box, it may start making sense AFTER scaling to 2TB memory node.
Imagine a select query doing a join. It has to read 1GB from nvme-array or from network.
Imagine a write, ending up as a distributed-write, it needs to wait for 2x+ more servers in the network.
We thought about migrating to Citus, but I don't have a good idea of how to shard our dataset efficiently.
If we were to shard by user id, then creating a match between two people would require cross-shard transactions and joins. Sharding by geography is also tough because people move around pretty frequently.
- either it is SAAS in which case shard it should make sense to shard by customer
- or it is something-to-consumer (social networking?) on which case I guess you'll have to take a step back and see if you can sacrifice one of your current assumptions
... but I feel I'm missing something since what I am saying feels a bit trivial.
Sharding a matching engine is indeed pretty hard, and requires redundancy and very deliberate data modelling choices.
That does seem like a fun exercise :).
The general rules of the game are: You can only scale up throughput of queries/transactions that only access 1 shard (some percentage going to 2 shards can be ok). You can only scale down response time of large operations that span across shard since they are parallelized. You should only join distributed tables on their distribution column. You can join reference tables on any column.
The thing that comes to mind is to use a reference table for any user data that is used to find/score matches. Reference tables are replicated to every node and can be joined with distributed tables and each other using arbitrary join clauses, so joining by score or distance is not a problem, but you need to store the data multiple times.
One of the immediate benefits of reference tables is that reads can be load-balanced across the nodes, either by using a setting (citus.task_assignment_policy = 'round-robin') or using a distributed table as a routing/parallelization scheme.
CREATE TABLE profiles (
user_id bigint primary key,
location geometry,
profile jsonb
);
SELECT create_reference_table('profiles');
CREATE TABLE users (
user_id bigint primary key references profiles (user_id),
name text,
email text
);
SELECT create_distributed_table('users', 'user_id');
-- replicate match_score function to all the nodes
SELECT create_distributed_function('match_score(jsonb,jsonb)');
-- look up profile of user 350, goes to 1 shard
SELECT * FROM users u, profiles p WHERE u.user_id = p.user_id AND u.user_id = 350;
-- find matches for user #240 within 5km, goes to 1 shard
SELECT b.user_id, match_score(a.profile, b.profile) AS score
FROM users u, profiles a, profiles b
WHERE u.user_id = 240 AND u.user_id = a.user_id
AND match_score(a.profile,b.profile) > 0.9 AND st_distance(a.location,b.location) < 5000
ORDER BY score DESC LIMIT 10;
The advantage of having the distributed users table in the join is mainly that you divide the work in a way that keeps each worker node's cache relatively hot for a specific subset of users, though you'll still be scanning most of the data to find matches.
Where it gets a bit more interesting is if your dating site is opinionated / does not let you search, since you can then generate matches upfront in batches in parallel.
CREATE TABLE match_candidates (
user_id_a bigint references profiles (user_id),
user_id_b bigint references profiles (user_id),
score float,
primary key (user_id_a, user_id_b)
);
SELECT create_distributed_table('match_candidates', 'user_id_a', colocate_with :='users');
-- generate match candidates for all users in a distributed, parallel fashion
-- will generate a match candidate in both directions, assuming score is commutative
INSERT INTO match_candidates
SELECT a.user_id, b.user_id, match_score(a.profile,b.profile) AS score
FROM users u, profiles a, profiles b
WHERE u.user_id = a.user_id
AND match_score(a.profile,b.profile) > 0.9 AND st_distance(a.location,b.location) < 5000
ORDER BY score DESC LIMIT 10;
For interests/matches, it might make sense to have some redundancy in order to achieve reads that go to 1 shard as much possible.
CREATE TABLE interests (
user_id_a bigint references profiles (user_id),
user_id_b bigint references profiles (user_id),
initiated_by_a bool,
mutual bool,
primary key (user_id_a, user_id_b)
);
SELECT create_distributed_table('interests', 'user_id_a', colocate_with :='users');
-- 240 is interested in 350, insert into 2 shards (uses 2PC)
BEGIN;
INSERT INTO interests VALUES (240, 350, true, false);
INSERT INTO interests VALUES (350, 240, false, false);
END;
-- people interested in #350, goes to 1 shard
SELECT * FROM interests JOIN profiles ON (user_id_b = user_id) WHERE user_id_a = 350 AND NOT initiated_by_a;
-- it's a match! update 2 shards (uses 2PC)
BEGIN;
UPDATE interests SET mutual = true WHERE user_id_a = 240 AND user_id_b = 350;
UPDATE interests SET mutual = true WHERE user_id_a = 350 AND user_id_b = 240;
END;
-- people #240 is matched with, goes to 1 shard
SELECT * FROM interests JOIN profiles ON (user_id_b = user_id) WHERE user_id_a = 240 AND mutual;
For data related to a specific match, you can perhaps use the smallest user ID as the distribution column to avoid the redundancy.
CREATE TABLE messages (
user_id_a bigint,
user_id_b bigint,
from_a bool,
message_text text,
message_time timestamptz default now(),
message_id bigserial,
primary key (user_id_a, user_id_b, message_id),
foreign key (user_id_a, user_id_b) references interests (user_id_a, user_id_b) on delete cascade
);
SELECT create_distributed_table('messages', 'user_id_a', colocate_with :='interests');
-- user 350 sends a message to 240, goes to 1 shard
INSERT INTO messages VALUES (240, 350, false, 'hi #240!');
-- user 240 sends a message to 350, goes to 1 shard
INSERT INTO messages VALUES (240, 350, true, 'hi!');
-- user 240 looks at chat with user 350, goes to 1 shard
SELECT from_a, message_text, message_time
FROM messages
WHERE user_id_a = 240 AND user_id_b = 350
ORDER BY message_time DESC LIMIT 100;
This exercise goes on for a while. You still get the benefits of PostgreSQL and ability to scale up throughput of common operations or scale down response time of batch operations, but it does require careful data model choices.
(Citus engineer who enjoys distributed systems puzzles)