greatmemory

Integrations

Cloud ETL & data management

Managed ETL patterns for Dataflow, Azure Data Factory, and AWS Glue: ingest, use, update, and delete greatmemory data.

Managed ETL services are a good fit when memory needs to be loaded from many documents, object stores, tables, or data warehouses on a schedule. The pattern is the same on every cloud:

  1. Extract records or files from the source system.
  2. Convert each item into clean text with a source URI and metadata.
  3. POST /v1/memories into a dedicated greatmemory space.
  4. Save the returned memory_id in a manifest table.
  5. Use POST /v1/search with mode: "context" when an agent needs the data.
  6. Delete by manifest with DELETE /v1/memories/{id} when the source data expires or should be removed.

Data management rules

ConcernPractical rule
SpacesUse one space per tenant, project, customer, or environment.
ProvenancePrefix every memory with SOURCE: <uri> and keep the same URI in your manifest.
UpdatesStore a source_hash; if the hash changes, delete the old memory id and add the new text.
DeletionNever rely on text search to clean up data. Delete by the memory_id saved at ingest time.
BatchesAdd an import_batch_id so a whole ETL run can be rolled back.
Sensitive dataRedact or filter in the ETL job before posting to greatmemory.
Large filesExtract text first, split very large documents by section/page if needed, then POST each section.

Google Cloud: Dataflow

Use Dataflow when you want a managed Apache Beam pipeline for Cloud Storage, BigQuery, Pub/Sub, or Cloud SQL extracts. Google describes Dataflow as a fully managed service that runs Apache Beam batch and streaming pipelines.

Minimal Apache Beam shape:

import hashlib
import json
import os
import requests
import apache_beam as beam

GM_URL = os.environ.get("GM_URL", "http://127.0.0.1:7437")
SPACE = os.environ.get("GM_SPACE", "gcp-etl")

class ToMemory(beam.DoFn):
    def process(self, row):
        source_uri = row["source_uri"]
        text = row["text"]
        source_hash = hashlib.sha256(text.encode("utf-8")).hexdigest()
        content = f"SOURCE: {source_uri}\nHASH: {source_hash}\n\n{text}"
        res = requests.post(
            f"{GM_URL}/v1/memories",
            json={"space": SPACE, "content": content},
            timeout=15,
        )
        res.raise_for_status()
        yield {
            "source_uri": source_uri,
            "source_hash": source_hash,
            "memory_id": res.json()["id"],
            "space": SPACE,
        }

with beam.Pipeline() as p:
    (
        p
        | "Read source rows" >> beam.io.ReadFromBigQuery(
            query="""
            SELECT
              CONCAT('bigquery://support.case_summaries/', CAST(case_id AS STRING)) AS source_uri,
              CONCAT('Customer: ', customer_id, '\nSummary: ', summary) AS text
            FROM `my_project.support.case_summaries`
            WHERE updated_at >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY)
            """,
            use_standard_sql=True,
        )
        | "Post to greatmemory" >> beam.ParDo(ToMemory())
        | "Write manifest" >> beam.io.WriteToBigQuery(
            "my_project.greatmemory_manifest.imported_memories",
            schema="source_uri:STRING,source_hash:STRING,memory_id:STRING,space:STRING",
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
        )
    )

For removal, run a small Dataflow or scheduled Cloud Run job that reads manifest rows for the retired source URI or batch and calls:

curl -s -X DELETE "$GM_URL/v1/memories/$MEMORY_ID"

Azure: Data Factory

Use Azure Data Factory when you want managed ETL/ELT orchestration with connectors for Blob Storage, Azure SQL, Cosmos DB, and other systems. Microsoft describes Azure Data Factory as Azure's cloud ETL service for scale-out serverless data integration and transformation.

Recommended pipeline shape:

  1. Use Copy activity to land source files/rows into a staging container or staging table.
  2. Use a Web activity or Azure Function activity to call a small ingestion function.
  3. The function extracts text, redacts sensitive fields, posts to greatmemory, and writes source_uri, source_hash, memory_id, space, and import_batch_id into an Azure SQL manifest table.

Example Azure Function core:

import hashlib
import json
import os
import requests

GM_URL = os.environ.get("GM_URL", "http://127.0.0.1:7437")
SPACE = os.environ.get("GM_SPACE", "azure-etl")

def ingest_item(item: dict) -> dict:
    source_uri = item["source_uri"]
    text = item["text"]
    source_hash = hashlib.sha256(text.encode("utf-8")).hexdigest()
    content = f"SOURCE: {source_uri}\nHASH: {source_hash}\n\n{text}"

    res = requests.post(
        f"{GM_URL}/v1/memories",
        json={"space": SPACE, "content": content},
        timeout=15,
    )
    res.raise_for_status()

    return {
        "source_uri": source_uri,
        "source_hash": source_hash,
        "memory_id": res.json()["id"],
        "space": SPACE,
    }

Deletion is another Data Factory pipeline: query the manifest table for rows to remove, call DELETE /v1/memories/{id} for each memory_id, and mark the manifest row as deleted.

AWS: Glue

Use AWS Glue when you want serverless data integration for S3, RDS/Aurora, DynamoDB exports, or cataloged data lake tables. AWS describes AWS Glue as a serverless data integration service for discovering, preparing, moving, and integrating data from multiple sources.

For Glue Python shell jobs, batch requests per partition and write a manifest to DynamoDB, RDS, or S3:

import hashlib
import json
import os
import boto3
import requests

GM_URL = os.environ.get("GM_URL", "http://127.0.0.1:7437")
SPACE = os.environ.get("GM_SPACE", "aws-etl")
manifest = boto3.resource("dynamodb").Table(os.environ["MANIFEST_TABLE"])

def ingest_record(record):
    source_uri = record["source_uri"]
    text = record["text"]
    source_hash = hashlib.sha256(text.encode("utf-8")).hexdigest()
    content = f"SOURCE: {source_uri}\nHASH: {source_hash}\n\n{text}"

    res = requests.post(
        f"{GM_URL}/v1/memories",
        json={"space": SPACE, "content": content},
        timeout=15,
    )
    res.raise_for_status()
    memory_id = res.json()["id"]

    manifest.put_item(Item={
        "source_uri": source_uri,
        "memory_id": memory_id,
        "source_hash": source_hash,
        "space": SPACE,
    })

For a Glue Spark job, put the same ingest_record logic inside foreachPartition so each worker reuses a session instead of opening a new connection for every row.

For deletion, read manifest rows from DynamoDB/RDS/S3 and call:

curl -s -X DELETE "$GM_URL/v1/memories/$MEMORY_ID"

Using managed ETL output in agents

After ETL has loaded data, agents do not need to know where it came from. They ask greatmemory for context:

curl -s "$GM_URL/v1/search" \
  -H 'Content-Type: application/json' \
  -d '{"space":"customer-ops","query":"latest Acme renewal risks","mode":"context","max_tokens":1200}'

Then the returned block goes into the model prompt. The source URI prefix gives the agent and operator enough provenance to trace the answer back to the ETL source.

Failure handling

  • Retry POST /v1/memories with idempotency at the source level: if the same source_uri and source_hash already exist in the manifest, skip it.
  • Keep a dead-letter table or object prefix for extraction failures.
  • Rate-limit REST calls when posting from many ETL workers.
  • Reconciliation job: periodically compare source inventory with the manifest and delete memories whose source object/row no longer exists.