Wednesday, October 8, 2008

Asynchronous SQL with Libevent and PgSQL

Ah the holidays are upon us all! This means food, relaxation, sleeping, hanging out, eating, sleeping, no school, avoiding girls, and most importantly learning new ways to subvert the usage of pthreads.

While pontificating with my old buddy Dilton about the perils of programming with threads, he brought up the notion that databases and threads are like hamburgers without catsup. That is: you just can't have one without the other.

This idea that anyone would turn down a hamburger; with or without catsup; was foreign to me, so I scoffed, pointed my finger into the air, and asserted the falsehood of this statement. The reality was I had no idea what I was saying. With only 2 meals that morning and only 12 hours of sleep I was delirious.

His point being that databases are [in]famous for being IO blocking, meaning anytime your software needed to make a query to a database, the application would hang until the query was finished. That is unless you are threading the database connections.

"Backup your statements, Jug." said he.
"This day, is the day that you shall rue, kind sir." said I.

I couldn't be wrong. I had to show Dilton that non-blocking database IO is a reality. I mean - I can't be the only one who has wanted this type of functionality, right?

Two corn dogs and a milkshake later I hopped on over to the Riverdale Library to research and was surprised to find so little information on the subject. MySQL made reference to a non-blocking API years ago, but no read documentation could be found. Oracle has the ability to do this but is stuffed into the raw OCI library and almost zero documentation on it. SQLite has something, but I wanted something more robust.

Dejected, I left the library. Hotdog (my faithful dog), was waiting for me outside. He raised an eyebrow and with my hands in my pockets, head lowered in rejection, we started home.

"Duh, why the long face Jughead?" someone said.

I stopped and looked up to see the round face of Moose. I perked up. While big Moose may not excel at day-to-day academics (he is a born athlete you know..), it's a little known fact that he aspires to be a Unix administrator post-football career.

Knowing this tidbit of information I asked him what all the cool kids are using these days for databases.

"Derr, I don't know, lemme thunk about that one. I sorta remember setting up a PostgreSQL database for Midge.." Moose saidslowly.

"Egads, does the PostgreSQL C API have non-blocking functions?" I ask excitedly.

"Duh, I thunk I read somthin' about functions dat allow asynchronous query processing." Moose exclaims.

I had heard all I needed to hear, and I ran home to setup a PostgreSQL instance on my workstation. What big Moose had said was true, there was an easy and documented way of dealing with database queries in a non-blocking manner!

There are 4 primary functions that are used in addition to normal PgSQL operations that allow for non blocking IO. I will explain what they are and how they relate to a libevent based single-threaded program here.

int PQsetnonblocking(PGconn *connection, int arg);
Once logged into the database, we use this function to set its underlying socket connection to non-blocking. If we don't, our connection will not return until the socket has flushed.

int PQsocket(PGconn *connection);
No matter what event notification system you are using (select, epoll, kqueue, or a high-level API like Libevent) you need to be able to reference a file descriptor to watch for readiness. This function will return the underlying socket connection file descriptor.

int PQconsumeInput(PGconn *connection);
Once your readiness notification system has found that one of your database connections is ready you call this function. This function will read all the data ready on the socket into memory. Since this reads all available data from the socket the readiness flag is cleared and your notification system can continue processing other requests. PQisBusy() must be called next to determine if the data is complete.

int PQisBusy(PGconn *connection);
This is called after PQconsumeInput, and will return 1 if the operation would block, meaning the server is not yet done sending a response. 0 is returned if all data is available.

Application Design.
I set my goal: I want to run several long-running DB queries in parallel while also displaying information to my screen without the use of threads.

I determine out a basic flow from start to end:
1. Connect to the database.
2. Initialize Libevent.
3. Install a timer event that displays concurrent query count every x microseconds.
4. Send a SQL query to every single DB connection, install an event handler to determine if the response is ready, and have it return immediately.
5. Run the event loop.

Next I create the database "jughead", create a table, and grant myself access to the database:
create sequence c start 101;
create table stuff (
cid int4 DEFAULT NEXTVAL('c'),
name char(50),
number int4,
data char(255),
primary key(cid));
grant all on stuff to jughead;
grant all on c to jughead;

In order to simulate a pgsql server with high load (and to have slow returns of data) I insert hundreds of thousands of rows.
INSERT INTO stuff (name, number, data) VALUES ('name_1', 1, 'data_1')
INSERT INTO stuff (name, number, data) VALUES ('name_2', 2, 'data_2')
INSERT INTO stuff (name, number, data) VALUES ('name_3', 3, 'data_3')
etc, etc, etc....

I quickly hacked up the following C code.


#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <assert.h>
#include <libpq-fe.h>
#include <event.h>

int current_queries = 0;

typedef struct event_db {
/*
* a simple identifier
*/
int id;
struct event ev;
PGconn *connection;
/*
* once a query has finished processing, call this function to deal
* with the data
*/
void (*callback) (void *);
} event_db_t;


void
do_timer(int a, short b, void *c)
{
struct timeval tv;

memset(&tv, 0, sizeof(tv));

tv.tv_sec = 0;
tv.tv_usec = 150000;

/*
* add this function to the event queue
*/
event_once(a, EV_TIMEOUT, do_timer, c, &tv);

/*
* notify the console of the fun!
*/
fprintf(stderr, "%d: do_timer has awoken! (%d active queries)\n",
(int) time(NULL), current_queries);
}

void
process_data(void *args)
{
PGresult *result;
event_db_t *db = (event_db_t *) args;

result = PQgetResult(db->connection);
printf("ID %d returned!\n", db->id);
printf(" Status: %s\n", PQresStatus(PQresultStatus(result)));
printf(" Returned %d rows ", PQntuples(result));
printf(" with %d columns\n\n", PQnfields(result));

PQclear(result);

current_queries -= 1;
}


event_db_t *
create_event_db(char *connstr, int id, void (*cb) (void *))
{
event_db_t *ret;

if ((ret = malloc(sizeof(event_db_t))) == NULL)
return NULL;

if ((ret->connection = PQconnectdb(connstr)) == NULL)
goto error;

if (PQstatus(ret->connection) != CONNECTION_OK)
goto error;

/*
* set this database connection as non-blocking!
*/
PQsetnonblocking(ret->connection, 1);

printf("ID %d has made a successful connection to the database\n", id);

ret->id = id;
ret->callback = cb;
return ret;
error:
if (ret->connection) {
fprintf(stderr, "%s\n", PQerrorMessage(ret->connection));
PQfinish(ret->connection);
}

free(ret);
return NULL;
}

event_db_t **
mass_connector(char *connstr, int number, void (*cb) (void *))
{
int i;
event_db_t **connections;

assert(number > 0);
assert(cb != NULL);

connections = malloc(number * sizeof(event_db_t *));

for (i = 0; i < number; i++) {
connections[i] = create_event_db(connstr, i, cb);
assert(connections[i] != NULL);
}

return connections;
}

void
db_driver(int fd, short event_type, void *arg)
{
event_db_t *conn = (event_db_t *) arg;

/*
* read in all data that is currently waiting for us.
*/
PQconsumeInput(conn->connection);

if (PQisBusy(conn->connection) == 0) {
/*
* Everything is here, now call our previously defined callback to
* deal with the finished data.
*/
conn->callback(arg);

/*
* delete this from our event queue
*/
event_del(&conn->ev);

} else {
/*
* connection would block normally, so we wait for more data to
* arrive
*/
return;
}
}

void
do_query(event_db_t * conn, char *query)
{
printf("%s: %p (%s)\n", __FUNCTION__, conn, query);

/*
* send the query to our PgSQL connection
*/
PQsendQuery(conn->connection, query);

if (PQstatus(conn->connection) != CONNECTION_OK) {
fprintf(stderr, "%s\n", PQerrorMessage(conn->connection));
return;
}

/*
* Add the connection to our event queue, when the socket is readable
* call the function db_driver
*/
event_set(&conn->ev,
/*
* the real file descriptor of the backend socket
*/
PQsocket(conn->connection),
EV_READ | EV_PERSIST, db_driver, conn);
event_add(&conn->ev, 0);

current_queries++;

return;
}

int
main(int argc, char **argv)
{
int number_of_connections,
i;
char *query;
event_db_t **db_connections;

if (argc == 3) {
number_of_connections = atoi(argv[1]);
query = argv[2];
} else {
printf("Usage: %s <connection count> <query>\n", argv[0]);
exit(1);
}

fprintf(stderr, "Using %d concurrent connections.\n",
number_of_connections);
fprintf(stderr, "Query: %s\n", query);

/*
* This will create several event_db_t structures, all of which will
* have a connection to our database
*/
db_connections = mass_connector(
/*
* pgsql connect string
*/
"dbname=jughead user=jughead",
number_of_connections,
/*
* the function to call once all
* data has been retrieved from the
* non-blocking database
* connection.
*/
process_data);

/*
* We must initialize libevent before any events are added to our
* queue.
*/
event_init();

/*
* Install our timer event which will run every so often to report the
* number of concurrent queries happening.
*/
do_timer(0, 0, 0);

/*
* send the query to all of our connections in a non-blocking way.
* do_query will install an event handler that will check if all data
* has been received, and if it has it will call the process_data
* function
*/
for (i = 0; i < number_of_connections; i++)
do_query(db_connections[i], query);

event_loop(0);
return 0;
}


jughead@naptime:~/code/dbtest> gcc -Wall -I/usr/local/pgsql/include jughead.c -o jughead -L/usr/local/pgsql/lib -lpq -levent
jughead@naptime:~/code/dbtest> ./jughead 5 "select * from stuff where data LIKE '%data%'"
Using 5 concurrent connections.
Query: select * from stuff where data LIKE '%data%'
ID 0 has made a successful connection to the database
ID 1 has made a successful connection to the database
ID 2 has made a successful connection to the database
ID 3 has made a successful connection to the database
ID 4 has made a successful connection to the database
1199142984: do_timer has awoken! (0 active queries)
do_query: 0x502040 (select * from stuff where data LIKE '%data%')
do_query: 0x50c660 (select * from stuff where data LIKE '%data%')
do_query: 0x515050 (select * from stuff where data LIKE '%data%')
do_query: 0x51d780 (select * from stuff where data LIKE '%data%')
do_query: 0x526060 (select * from stuff where data LIKE '%data%')
1199142984: do_timer has awoken! (5 active queries)
1199142985: do_timer has awoken! (5 active queries)
1199142985: do_timer has awoken! (5 active queries)
1199142985: do_timer has awoken! (5 active queries)
1199142985: do_timer has awoken! (5 active queries)
1199142985: do_timer has awoken! (5 active queries)
1199142986: do_timer has awoken! (5 active queries)
1199142986: do_timer has awoken! (5 active queries)
1199142986: do_timer has awoken! (5 active queries)
ID 0 returned!
Status: PGRES_TUPLES_OK
Returned 178448 rows with 4 columns

1199142987: do_timer has awoken! (4 active queries)
1199142987: do_timer has awoken! (4 active queries)
ID 4 returned!
Status: PGRES_TUPLES_OK
Returned 178448 rows with 4 columns

1199142988: do_timer has awoken! (3 active queries)
ID 1 returned!
Status: PGRES_TUPLES_OK
Returned 178448 rows with 4 columns

1199142988: do_timer has awoken! (2 active queries)
1199142988: do_timer has awoken! (2 active queries)
1199142988: do_timer has awoken! (2 active queries)
1199142988: do_timer has awoken! (2 active queries)
1199142988: do_timer has awoken! (2 active queries)
1199142988: do_timer has awoken! (2 active queries)
1199142989: do_timer has awoken! (2 active queries)
1199142989: do_timer has awoken! (2 active queries)
ID 3 returned!
Status: PGRES_TUPLES_OK
Returned 178448 rows with 4 columns

ID 2 returned!
Status: PGRES_TUPLES_OK
Returned 178448 rows with4 columns

1199142989: do_timer has awoken! (0 active queries)

2 comments:

Anonymous said...

See also: http://www.ng2000.com/fw.php?tp=sql

Anonymous said...

See also: http://www.ng2000.com/fw.php?tp=sql

Followers