Memory

TouAI Memory provides Data Lakehouse object storage with both a simple API and organization-scoped APIs.

Simple API

Use the simple API when you want the SDK to resolve the default workspace, project, and bucket automatically.

stored = client.storage.store(open("data.csv", "rb"))
print(stored.key)
 
stored = client.storage.store_text(
    '{"users": [{"name": "Alice"}]}',
    "data.json",
)
 
files = client.storage.files(prefix="data/")
url = client.storage.download_url("data/file.csv", expires_in=3600)
info = client.storage.file_info("data/file.csv")
client.storage.delete_file("data/old.csv")
overview = client.storage.info()
MethodReturn TypeDescription
store(file)StoredObjectUpload a binary file
store_text(content, filename)StoredObjectStore text or JSON
files(prefix=...)list[StoredFile]List files
download_url(key)strGet a pre-signed download URL
file_info(key)FileInfoGet file metadata
delete_file(key)NoneDelete a file
info()StorageInfoMemory overview

Full API

Projects

projects = client.storage.projects.list("org-id")
project = client.storage.projects.create("org-id", "Analytics", description="...")
project = client.storage.projects.get("org-id", "project-id")
client.storage.projects.update("org-id", "project-id", name="Renamed")
client.storage.projects.delete("org-id", "project-id")
 
members = client.storage.projects.members("org-id", "project-id")
client.storage.projects.add_member("org-id", "project-id", "user-id", role="editor")
client.storage.projects.update_member("org-id", "project-id", "member-id", role="admin")
client.storage.projects.remove_member("org-id", "project-id", "member-id")

Buckets

bucket = client.storage.buckets.create("org-id", "project-id", "raw-data")
buckets = client.storage.buckets.list("org-id", "project-id")
bucket = client.storage.buckets.get("org-id", "bucket-id")
client.storage.buckets.update("org-id", "bucket-id", name="renamed")
client.storage.buckets.delete("org-id", "bucket-id")

Objects

client.storage.objects.upload("org-id", "bucket-id", open("file.csv", "rb"), path="data/")
 
signed = client.storage.objects.upload_signed_url(
    "org-id",
    "bucket-id",
    "reports/q1.pdf",
    "application/pdf",
)
 
page = client.storage.objects.list(
    "org-id",
    "bucket-id",
    prefix="reports/",
    delimiter="/",
    limit=100,
)
 
url = client.storage.objects.download("org-id", "bucket-id", "reports/q1.pdf")
info = client.storage.objects.info("org-id", "bucket-id", "reports/q1.pdf")
client.storage.objects.delete("org-id", "bucket-id", "reports/q1.pdf")
client.storage.objects.bulk_delete("org-id", "bucket-id", ["old1.csv", "old2.csv"])

Large-file upload

upload_large streams the file directly from the SDK process to S3 over a presigned URL — bytes never pass through the service pod, so upload speed and reliability are bounded only by the local network. Auto-selects single PUT for < 5 GiB and S3 multipart (64 MiB parts) for larger files; aborts the session on failure so partial bytes don't accumulate.

result = client.storage.objects.upload_large(
    "org-id",
    "bucket-id",
    "benchmarks/video.mp4",
    key="raw/video.mp4",          # optional; defaults to local filename
    content_type="video/mp4",     # optional; guessed from the path
    part_size=64 * 1024 * 1024,   # multipart-path only
)
print(result["key"], result["size"], result["etag"])

Low-level multipart upload

For custom resumable / parallel flows. multipart_initiate returns the upload_id; sign each part for a presigned PUT, capture the ETag from each response, then call multipart_complete with the part list.

init = client.storage.objects.multipart_initiate(
    "org-id", "bucket-id", "data.bin", content_type="application/octet-stream",
)
part = client.storage.objects.multipart_sign_part(
    "org-id", "bucket-id",
    upload_id=init.upload_id,
    key="data.bin",
    part_number=1,
)
# httpx.put(part.url, content=chunk) → capture ETag from response headers
client.storage.objects.multipart_complete(
    "org-id", "bucket-id",
    init.upload_id,
    "data.bin",
    [{"part_number": 1, "etag": "..."}],
)
 
# On error:
client.storage.objects.multipart_abort(
    "org-id", "bucket-id", init.upload_id, "data.bin",
)

Personal Memory

projects = client.storage.my.projects.list()
project = client.storage.my.projects.create("My Project")
 
buckets = client.storage.my.buckets.list("project-id")
bucket = client.storage.my.buckets.create("project-id", "my-bucket")
 
client.storage.my.objects.upload("bucket-id", open("file.csv", "rb"))
objects = client.storage.my.objects.list("bucket-id", prefix="data/")
url = client.storage.my.objects.download("bucket-id", "data/file.csv")
info = client.storage.my.objects.info("bucket-id", "data/file.csv")
client.storage.my.objects.delete("bucket-id", "data/file.csv")
client.storage.my.objects.bulk_delete("bucket-id", ["old1.csv", "old2.csv"])

Presigned uploads (personal)

Mirrors the org-scoped variant, with org_id resolved from the personal workspace.

signed = client.storage.my.objects.upload_signed_url(
    "bucket-id",
    "reports/q1.pdf",
    "application/pdf",
    expires_in=3600,
)
 
result = client.storage.my.objects.upload_large(
    "bucket-id",
    "benchmarks/video.mp4",
    key="raw/video.mp4",
    content_type="video/mp4",
)

Low-level multipart upload (personal)

init = client.storage.my.objects.multipart_initiate(
    "bucket-id", "data.bin", content_type="application/octet-stream",
)
part = client.storage.my.objects.multipart_sign_part(
    "bucket-id",
    upload_id=init.upload_id,
    key="data.bin",
    part_number=1,
)
client.storage.my.objects.multipart_complete(
    "bucket-id",
    init.upload_id,
    "data.bin",
    [{"part_number": 1, "etag": "..."}],
)
 
client.storage.my.objects.multipart_abort(
    "bucket-id", init.upload_id, "data.bin",
)
Data SearchBilling
Memory | Documentation