Integrations
Cloud ETL & data management
Managed ETL patterns for Dataflow, Azure Data Factory, and AWS Glue: ingest, use, update, and delete greatmemory data.
Managed ETL services are a good fit when memory needs to be loaded from many documents, object stores, tables, or data warehouses on a schedule. The pattern is the same on every cloud:
- Extract records or files from the source system.
- Convert each item into clean text with a source URI and metadata.
POST /v1/memoriesinto a dedicated greatmemoryspace.- Save the returned
memory_idin a manifest table. - Use
POST /v1/searchwithmode: "context"when an agent needs the data. - Delete by manifest with
DELETE /v1/memories/{id}when the source data expires or should be removed.
Data management rules
| Concern | Practical rule |
|---|---|
| Spaces | Use one space per tenant, project, customer, or environment. |
| Provenance | Prefix every memory with SOURCE: <uri> and keep the same URI in your manifest. |
| Updates | Store a source_hash; if the hash changes, delete the old memory id and add the new text. |
| Deletion | Never rely on text search to clean up data. Delete by the memory_id saved at ingest time. |
| Batches | Add an import_batch_id so a whole ETL run can be rolled back. |
| Sensitive data | Redact or filter in the ETL job before posting to greatmemory. |
| Large files | Extract text first, split very large documents by section/page if needed, then POST each section. |
Google Cloud: Dataflow
Use Dataflow when you want a managed Apache Beam pipeline for Cloud Storage, BigQuery, Pub/Sub, or Cloud SQL extracts. Google describes Dataflow as a fully managed service that runs Apache Beam batch and streaming pipelines.
Minimal Apache Beam shape:
import hashlib
import json
import os
import requests
import apache_beam as beam
GM_URL = os.environ.get("GM_URL", "http://127.0.0.1:7437")
SPACE = os.environ.get("GM_SPACE", "gcp-etl")
class ToMemory(beam.DoFn):
def process(self, row):
source_uri = row["source_uri"]
text = row["text"]
source_hash = hashlib.sha256(text.encode("utf-8")).hexdigest()
content = f"SOURCE: {source_uri}\nHASH: {source_hash}\n\n{text}"
res = requests.post(
f"{GM_URL}/v1/memories",
json={"space": SPACE, "content": content},
timeout=15,
)
res.raise_for_status()
yield {
"source_uri": source_uri,
"source_hash": source_hash,
"memory_id": res.json()["id"],
"space": SPACE,
}
with beam.Pipeline() as p:
(
p
| "Read source rows" >> beam.io.ReadFromBigQuery(
query="""
SELECT
CONCAT('bigquery://support.case_summaries/', CAST(case_id AS STRING)) AS source_uri,
CONCAT('Customer: ', customer_id, '\nSummary: ', summary) AS text
FROM `my_project.support.case_summaries`
WHERE updated_at >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY)
""",
use_standard_sql=True,
)
| "Post to greatmemory" >> beam.ParDo(ToMemory())
| "Write manifest" >> beam.io.WriteToBigQuery(
"my_project.greatmemory_manifest.imported_memories",
schema="source_uri:STRING,source_hash:STRING,memory_id:STRING,space:STRING",
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
)
)
For removal, run a small Dataflow or scheduled Cloud Run job that reads manifest rows for the retired source URI or batch and calls:
curl -s -X DELETE "$GM_URL/v1/memories/$MEMORY_ID"
Azure: Data Factory
Use Azure Data Factory when you want managed ETL/ELT orchestration with connectors for Blob Storage, Azure SQL, Cosmos DB, and other systems. Microsoft describes Azure Data Factory as Azure's cloud ETL service for scale-out serverless data integration and transformation.
Recommended pipeline shape:
- Use Copy activity to land source files/rows into a staging container or staging table.
- Use a Web activity or Azure Function activity to call a small ingestion function.
- The function extracts text, redacts sensitive fields, posts to greatmemory, and
writes
source_uri,source_hash,memory_id,space, andimport_batch_idinto an Azure SQL manifest table.
Example Azure Function core:
import hashlib
import json
import os
import requests
GM_URL = os.environ.get("GM_URL", "http://127.0.0.1:7437")
SPACE = os.environ.get("GM_SPACE", "azure-etl")
def ingest_item(item: dict) -> dict:
source_uri = item["source_uri"]
text = item["text"]
source_hash = hashlib.sha256(text.encode("utf-8")).hexdigest()
content = f"SOURCE: {source_uri}\nHASH: {source_hash}\n\n{text}"
res = requests.post(
f"{GM_URL}/v1/memories",
json={"space": SPACE, "content": content},
timeout=15,
)
res.raise_for_status()
return {
"source_uri": source_uri,
"source_hash": source_hash,
"memory_id": res.json()["id"],
"space": SPACE,
}
Deletion is another Data Factory pipeline: query the manifest table for rows to
remove, call DELETE /v1/memories/{id} for each memory_id, and mark the manifest
row as deleted.
AWS: Glue
Use AWS Glue when you want serverless data integration for S3, RDS/Aurora, DynamoDB exports, or cataloged data lake tables. AWS describes AWS Glue as a serverless data integration service for discovering, preparing, moving, and integrating data from multiple sources.
For Glue Python shell jobs, batch requests per partition and write a manifest to DynamoDB, RDS, or S3:
import hashlib
import json
import os
import boto3
import requests
GM_URL = os.environ.get("GM_URL", "http://127.0.0.1:7437")
SPACE = os.environ.get("GM_SPACE", "aws-etl")
manifest = boto3.resource("dynamodb").Table(os.environ["MANIFEST_TABLE"])
def ingest_record(record):
source_uri = record["source_uri"]
text = record["text"]
source_hash = hashlib.sha256(text.encode("utf-8")).hexdigest()
content = f"SOURCE: {source_uri}\nHASH: {source_hash}\n\n{text}"
res = requests.post(
f"{GM_URL}/v1/memories",
json={"space": SPACE, "content": content},
timeout=15,
)
res.raise_for_status()
memory_id = res.json()["id"]
manifest.put_item(Item={
"source_uri": source_uri,
"memory_id": memory_id,
"source_hash": source_hash,
"space": SPACE,
})
For a Glue Spark job, put the same ingest_record logic inside foreachPartition
so each worker reuses a session instead of opening a new connection for every row.
For deletion, read manifest rows from DynamoDB/RDS/S3 and call:
curl -s -X DELETE "$GM_URL/v1/memories/$MEMORY_ID"
Using managed ETL output in agents
After ETL has loaded data, agents do not need to know where it came from. They ask greatmemory for context:
curl -s "$GM_URL/v1/search" \
-H 'Content-Type: application/json' \
-d '{"space":"customer-ops","query":"latest Acme renewal risks","mode":"context","max_tokens":1200}'
Then the returned block goes into the model prompt. The source URI prefix gives the agent and operator enough provenance to trace the answer back to the ETL source.
Failure handling
- Retry
POST /v1/memorieswith idempotency at the source level: if the samesource_uriandsource_hashalready exist in the manifest, skip it. - Keep a dead-letter table or object prefix for extraction failures.
- Rate-limit REST calls when posting from many ETL workers.
- Reconciliation job: periodically compare source inventory with the manifest and delete memories whose source object/row no longer exists.