Storing timeseries data in Postgres on AWS RDS

“Big Data” these days usually means “time series.” There are only two relevant attributes of time series data: like Stregga Nona’s pasta, it tends to grow without bound, while simultaneously becoming less useful as time passes.

I can understand why developers, faced with an overflowing pot of pasta (data), are quick to look elsewhere besides their SQL database, but as someone who’s been down that path with DynamoDB and Redshift, let me tell you that Postgres had what you were looking for all along.

Sharding the data

Before we begin, let me say that I owe an immense debt to this Engine Yard post for being the only comprehensive source for most of this information.

Suppose you’re keeping tabs on the current weather conditions. You start with a simple table:

CREATE TABLE public.weather (
  "created_at" TIMESTAMP WITH TIME ZONE NOT NULL,
  "location" GEOMETRY,
  "temperature" FLOAT
);
CREATE INDEX ON public.weather ("created_at");
CREATE INDEX ON public.weather USING GIST ("location");
CREATE INDEX ON public.weather ("temperature");

but soon the table grows so big that simply inserting rows and updating the relevant indices slows everything down.

You’d much rather have multiple tables weather_2015_01, weather_2015_02, etc. which contain only the events for that particular month. Should you pre-create all the tables to a decade out? Should you write custom application code to route weather events to the appropriate table? What about querying for a time range? More application code?

Heck no.

By judiciously using Postgres’ support for child tables (using INHERITS) and writing a function to be triggered on insert, we can automate all of the above!

CREATE OR REPLACE FUNCTION public.weather_partition_function()
RETURNS TRIGGER AS
$BODY$
  DECLARE
  _new_date timestamp with time zone;
  _tablename text;
  _startdate text;
  _enddate text;
  _result record;
  BEGIN
    --Takes the current inbound "created_at" value and determines the year and month
    _new_date := date_trunc('month', NEW.created_at);
    _startdate := to_char(_new_date, 'YYYY_MM');
    _tablename := 'weather_'||_startdate;

    -- Check if the partition needed for the current record exists
    PERFORM 1
    FROM   pg_catalog.pg_class c
    JOIN   pg_catalog.pg_namespace n ON n.oid = c.relnamespace
    WHERE  c.relkind = 'r'
    AND    c.relname = _tablename
    AND    n.nspname = 'public';

    -- If the partition needed does not yet exist, then we create it:
    -- Note that || is string concatenation (joining two strings to make one)
    IF NOT FOUND THEN
      _enddate:=_startdate::timestamp + INTERVAL '1 month';
      EXECUTE
      'CREATE TABLE public.' || quote_ident(_tablename) || ' (
        CHECK (created_at >= DATE ' || quote_literal(_startdate) || '
          AND created_at < DATE ' || quote_literal(_enddate) || '
            ) ) INHERITS (public.weather)';

      -- Table permissions are not inherited from the parent
      -- If permissions change on the master, change them on the child as well
      EXECUTE 'ALTER TABLE public.'
        || quote_ident(_tablename) || ' OWNER TO me';
      EXECUTE 'GRANT SELECT ON TABLE public.'
        || quote_ident(_tablename) || ' TO PUBLIC';
      EXECUTE 'GRANT INSERT ON TABLE public.'
        || quote_ident(_tablename) || ' TO PUBLIC';

      -- Indices are defined per child table
      EXECUTE 'CREATE INDEX ON public.'
        || quote_ident(_tablename) || ' (created_at)';
      EXECUTE 'CREATE INDEX ON public.'
        || quote_ident(_tablename) || ' USING GIST (location)';
      EXECUTE 'CREATE INDEX ON public.'
        || quote_ident(_tablename) || ' (temperature)';
    END IF;

    -- Insert the current record into the correct partition, which we are sure will now exist
    EXECUTE 'INSERT INTO public.'
      || quote_ident(_tablename) || ' VALUES ($1.*)' USING NEW;
    RETURN NULL;
  END;
$BODY$
LANGUAGE plpgsql;

To run this function on every insertion is straightforward:

CREATE TRIGGER weather_trigger
  BEFORE INSERT ON public.weather
  FOR EACH ROW EXECUTE PROCEDURE public.weather_partition_function();

There are two tricks in the above: checking pg_catalog.pg_class to see if the correct table shard exists, and CREATE TABLE ... INHERITS (public.weather). The first allows us to automatically create tables as needed while the second allows us to perform

SELECT *
FROM public.weather
WHERE created_at >= '2015-01-01'::date AND created_at < '2015-02-01'::date

to automatically query only the weather_2015_01 child table.

Reaping old shards

Clearing out old data can now be as simple as DROP TABLE weather_2014_01. However, we’ve seen how simple our life became by appropriately scripting the insertion process, so let’s try the same for deletion.

CREATE OR REPLACE FUNCTION public.partition_maintenance(in_tablename_prefix text, in_asof date)
RETURNS text AS
$BODY$
  DECLARE
  _result record;
  _return_message text;
  return_message text;
  BEGIN
    -- Initialize the return_message to empty to indicate no errors hit
    _return_message := '';

    --Validate input to function
    IF in_tablename_prefix IS NULL THEN
      RETURN 'Child table name prefix must be provided'::text;
    ELSIF in_asof IS NULL THEN
      RETURN 'You must provide the as-of date, (current_date - interval '1 year') is the typical value';
    END IF;

    FOR _result IN SELECT * FROM pg_tables WHERE schemaname='public' LOOP
      IF POSITION(in_tablename_prefix in _result.tablename) > 0
         AND char_length(substring(_result.tablename from '[0-9][0-9_]*$')) <> 0
         AND in_asof > to_timestamp(substring(_result.tablename from '[0-9][0-9_]*$'),'YYYY_MM')
        THEN
        BEGIN
          -- Drop the child partition
          EXECUTE 'DROP TABLE public.'
            || quote_ident(_result.tablename);
          _return_message := return_message
            || 'Dumped table: ' || _result.tablename::text || ', ';
          RAISE NOTICE 'Dumped table %', _result.tablename::text;
          EXCEPTION WHEN OTHERS THEN
          _return_message := return_message
            || 'ERROR dumping table: ' || _result.tablename::text || ', ';
          RAISE NOTICE 'ERROR DUMPING %', _result.tablename::text;
        END;
      END IF;
    END LOOP;

    RETURN _return_message || 'Done'::text;
  END;
$BODY$
LANGUAGE plpgsql VOLATILE COST 100;

To clear out old shards, simply execute SELECT partition_maintenance('weather', CURRENT_DATE - INTERVAL '1 year') to get rid of anything from more than a year ago.

Why bother?

If you’re in the business of collecting time series data, you’ve probably found that keeping the data in a separate store (DynamoDB, MongoDB, CSVs in S3, etc.) is a huge pain when you want to join it with some of your other data sets. Keeping everything in one database like Postgres is a huge win for ad-hoc queries and needn’t bog down your Postgres instance with unneeded, old data.