Instructure Developer Documentation Portal
Community
  • Introduction
  • Services
    • Elevate Standards Alignment - AB Connect API
      • Introduction
        • Authentication
        • Addressing Object Properties
        • Requesting Additional Properties in the Response
        • Filtering Using ODATA Like Statements
        • Sorting
        • Facets
        • Paging Data
        • Call Throttling
        • Working with Related Object
        • Error Responses
        • Character Set Support
        • How To Articles, Recommendations and Suggestions
        • Examples
        • Using AB Connect's Embeddable Widgets
      • Reference
        • Standards
        • Standard Collections
        • Events
        • Topics
        • Concepts
        • Assets
        • Asset Definitions
        • Asset Collections
        • Managing and Predicting Relationships
        • Providers
    • Canvas LMS
      • Basics
        • GraphQL
        • API Change Log
        • SIS IDs
        • Pagination
        • Throttling
        • Compound Documents
        • File Uploads
        • API Endpoint Attributes
        • Masquerading
      • OAuth2
        • OAuth2 Overview
        • OAuth2 Endpoints
        • Developer Keys
      • Resources
        • Access Tokens
        • Account Calendars
        • Account Domain Lookups
        • Account Notifications
        • Account Reports
        • Accounts
        • Accounts (LTI)
        • Admins
        • Analytics
        • Announcement External Feeds
        • Announcements
        • API Token Scopes
        • Appointment Groups
        • Assignment Extensions
        • Assignment Groups
        • Assignments
        • Authentication Providers
        • Authentications Log
        • Blackout Dates
        • BlockEditorTemplate
        • Blueprint Courses
        • Bookmarks
        • Brand Configs
        • Calendar Events
        • Collaborations
        • CommMessages
        • Communication Channels
        • Conferences
        • Content Exports
        • Content Migrations
        • Content Security Policy Settings
        • Content Shares
        • Conversations
        • Course Audit log
        • Course Pace
        • Course Quiz Extensions
        • Course Reports
        • Courses
        • Custom Gradebook Columns
        • Developer Key Account Bindings
        • Developer Keys
        • Discussion Topics
        • Enrollment Terms
        • Enrollments
        • ePortfolios
        • ePub Exports
        • Error Reports
        • External Tools
        • Favorites
        • Feature Flags
        • Files
        • Grade Change Log
        • Gradebook History
        • Grading Period Sets
        • Grading Periods
        • Grading Standards
        • Group Categories
        • Groups
        • History
        • InstAccess tokens
        • JWTs
        • Late Policy
        • Learning Object Dates
        • Line Items
        • LiveAssessments
        • Logins
        • LTI Launch Definitions
        • LTI Registrations
        • LTI Resource Links
        • Media Objects
        • Moderated Grading
        • Modules
        • Names and Role
        • New Quiz Items
        • New Quizzes
        • New Quizzes Accommodations
        • New Quizzes Reports
        • Notification Preferences
        • Originality Reports
        • Outcome Groups
        • Outcome Imports
        • Outcome Results
        • Outcomes
        • Pages
        • Peer Reviews
        • Planner
        • Poll Sessions
        • PollChoices
        • Polls
        • PollSubmissions
        • Proficiency Ratings
        • Progress
        • Public JWK
        • Quiz Assignment Overrides
        • Quiz Extensions
        • Quiz IP Filters
        • Quiz Question Groups
        • Quiz Questions
        • Quiz Reports
        • Quiz Statistics
        • Quiz Submission Events
        • Quiz Submission Files
        • Quiz Submission Questions
        • Quiz Submission User List
        • Quiz Submissions
        • Quizzes
        • Result
        • Roles
        • Rubrics
        • Sandboxes
        • Score
        • Search
        • Sections
        • Services
        • Shared Brand Configs
        • SIS Import Errors
        • SIS Imports
        • SIS Integration
        • Smart Search
        • Submission Comments
        • Submissions
        • Tabs
        • Temporary Enrollment Pairings
        • User Observees
        • Users
        • What If Grades
      • Outcomes
        • Outcomes CSV Format
      • Group Categories
        • Group Categories CSV Format
      • SIS
        • SIS CSV Format
      • External Tools
        • LTI
          • Introduction
          • Registration
          • Launch Overview
          • Configuring
          • Variable Substitutions
          • Deep Linking
          • Grading
          • Provisioning
          • PostMessage
          • Platform Notification Service
          • Placements
            • Placements Overview
            • Navigation
            • Homework Submission
            • Editor Button
            • Migration Selection
            • Link Selection (Modules)
            • Assignment Selection
            • Collaborations
        • xAPI
        • Canvas Roles
        • Plagiarism Detection Platform
          • Overview
          • Plagiarism Detection Platform Assignments
          • Plagiarism Detection Platform Users
          • Plagiarism Detection Submissions
          • Webhooks Subscriptions for Plagiarism Platform
          • JWT Access Tokens
      • Data Services
        • Live Events
          • Overview
            • Introduction
            • Setup
            • Caliper
            • Metadata
          • Event Format
            • Canvas
              • Account
              • Asset
              • Assignment
              • Attachment
              • Content
              • Conversation
              • Course
              • Discussion
              • Enrollment
              • Grade
              • Group
              • Learning
              • Logged
              • Module
              • Outcome
              • Outcomes
              • Plagiarism
              • Quiz
              • Rubric
              • Sis
              • Submission
              • Syllabus
              • User
              • Wiki
            • Caliper IMS 1.1
              • Assessment
              • Basic
              • Forum
              • Grading
              • Navigation Events
              • Session
    • Catalog
      • APIs
        • Analytics
        • Bulk Enrollments
        • Catalogs
        • Certificates
        • Completed Certificates
        • Courses
        • Email Domain Set
        • Enrollments
        • Orders
        • Programs
        • Progresses
        • Tags
        • User Registrations
        • Users
        • Waitlist Applicants
    • Credentials
      • Getting Started
      • Authentication
        • Password-Based Authentication
        • Authorization Code-Based Authentication
      • Pagination
      • APIs
        • Assertions
        • Backpack
        • Badgeclasses
        • Issuers
        • Organizations
        • Users
      • Release Notes
    • Data Access Platform
      • Key Concepts
      • Data Formats
      • Rate Limits & Policies
      • Datasets
        • Namespaces
          • canvas
            • canvas types
          • canvas_logs
          • catalog
        • Additional Notes
        • Entity Relationship Diagram
      • Query API
        • Authentication
        • Reference
      • Command Line (DAP CLI)
        • Getting Started
        • Secure Connection
        • Reference
          • dap snapshot
          • dap incremental
          • dap list
          • dap schema
          • dap initdb
          • dap syncdb
          • dap dropdb
      • Client Library
        • Examples
        • Reference
      • Release Notes
      • Status
    • DataSync
      • Interop API
      • Interop Data API
      • Grades Exchange API
      • OneRoster API
      • Platform API
    • Instructure Media
      • API Reference
        • Captions
        • Collection
        • Courses
        • Group
        • Insights
        • Media
        • Media Upload
        • Ping
        • Professional Captioning
        • Tags
        • Transfer Media
        • User
    • Quizzes
      • Quiz API
Powered by GitBook

Copyright © 2008-2024 Instructure, Inc. All rights reserved. Various trademarks held by their respective owners.

On this page
  • Initiate library
  • Obtaining the latest schema
  • Fetching table data with a snapshot query
  • Getting latest changes with an incremental query
  • Replicating data to a database
  • Synchronizing data with a database
  • Dropping data from a database
  • Executing operations on multiple tables
  • Configure log level for debugging

Was this helpful?

  1. Services
  2. Data Access Platform
  3. Client Library

Examples

Learn how to use the DAP Client Library.

PreviousClient LibraryNextReference

Last updated 2 months ago

Was this helpful?

DAP client library is following the , and makes use of the new Python keywords async and await. The examples below have to be executed in an asynchronous context. You can by invoking asyncio.run. By default, a Python script runs in a synchronous context; you must wrap the examples below into an async function, or you will get a syntax error.

Initiate library

First, we need to instantiate the DAPClient class:

import os
from dap.api import DAPClient
from dap.dap_types import Credentials

base_url: str = os.environ["DAP_API_URL"]
client_id: str = os.environ["DAP_CLIENT_ID"]
client_secret: str = os.environ["DAP_CLIENT_SECRET"]

credentials = Credentials.create(client_id=client_id, client_secret=client_secret)
async with DAPClient(base_url, credentials) as session:
    ...

However, DAPClient can automatically extract the value of these parameters from the above environment variables, allowing us to write:

async with DAPClient() as session:
    ...

Note that DAPClient uses an asynchronous context manager. Keywords such as async with are permitted only in an asynchronous context. We can enter such a context by invoking asyncio.run(my_function(arg1, arg2, ...)).

Let’s explore a few common use cases with DAPClient.

Obtaining the latest schema

from dap.api import DAPClient

async with DAPClient() as session:
    schema = await session.get_table_schema("canvas", "accounts")

We can also save the schema to a file.

import os
from dap.api import DAPClient

output_directory: str = os.getcwd()
async with DAPClient() as session:
    tables = await session.get_tables("canvas")
    for table in tables:
        await session.download_table_schema("canvas", table, output_directory)

Fetching table data with a snapshot query

In order to get an initial copy of the full table contents, we need to perform a snapshot query. The parameter format determines the output data format, including CSV, TSV, JSONL and Parquet. We recommend JSONL or Parquet. For JSONL, each line in the output can be parsed into a JSON object, conforming to the JSON schema returned above.

import os
from dap.api import DAPClient
from dap.dap_types import Format, SnapshotQuery

output_directory = os.getcwd()
async with DAPClient() as session:
    query = SnapshotQuery(format=Format.JSONL, mode=None)
    await session.download_table_data(
        "canvas", "accounts", query, output_directory, decompress=True
    )

Getting latest changes with an incremental query

Once an initial snapshot has been obtained, we need to keep the data synchronized with DAP. This is possible with incremental queries. The following, more complex example gets all changes since a specified since timestamp, and saves each data file on the server to an output file in the local filesystem. The last_seen timestamp is typically the until returned by a previous incremental query.

import os
from datetime import datetime, timezone
from urllib.parse import ParseResult, urlparse

import aiofiles

from dap.api import DAPClient
from dap.dap_types import Format, IncrementalQuery

# timestamp returned by last snapshot or incremental query
last_seen = datetime(2023, 2, 1, 0, 0, 0, tzinfo=timezone.utc)

async with DAPClient() as session:
    query = IncrementalQuery(
        format=Format.JSONL,
        mode=None,
        since=last_seen,
        until=None,
    )
    result = await session.get_table_data("canvas", "accounts", query)
    resources = await session.get_resources(result.objects)
    for resource in resources.values():
        components: ParseResult = urlparse(str(resource.url))
        file_path = os.path.join(
            os.getcwd(), "data", os.path.basename(components.path)
        )
        async for stream in session.stream_resource(resource):
            async with aiofiles.open(file_path, "wb") as file:
                # save gzip data to file without decompressing
                async for chunk in stream.iter_chunked(64 * 1024):
                    await file.write(chunk)

Replicating data to a database

Earlier sections have shown how to obtain the latest schema, fetch data with a snapshot query, or get the latest changes with an incremental query. These are low-level operations that give you full control over what you do with the data.

However, in most cases we want high-level operations that ensure our database (either running locally or in the cloud) is synchronized with the data in DAP, without paying attention to specifics of data transfer. This is possible with two operations that

  1. initialize a database, and

  2. synchronize a database with the data in DAP.

In order to replicate data in DAP locally, we must first initialize a database:

from dap.api import DAPClient
from dap.integration.database import DatabaseConnection, DatabaseConnectionConfig
from dap.replicator.sql import SQLReplicator

# Define connection parameters as an object
connection_params = DatabaseConnectionConfig(
    dialect="postgresql",
    host="server.example.com",
    port=5432,
    username="scott",
    password="password",
    database="testdb"
)

# Initialize database connection with the object-based parameters
db_connection = DatabaseConnection(connection_params)

# Use the DAPClient and SQLReplicator
async with DAPClient() as session:
    sql_replicator = SQLReplicator(session, db_connection)
    await sql_replicator.version_upgrade()
    await sql_replicator.initialize(namespace, table_name)

Initialization creates a database schema for the DAP namespace, and a corresponding database table for each DAP table. In addition, it creates a meta-table, which is a special database table that holds synchronization information, e.g. the last time the data was synchronized with DAP, and the schema version that the locally stored data conforms to. Finally, it issues a snapshot query to DAP API, and populates the database table with output returned by the snapshot query.

Synchronizing data with a database

Once the table has been initialized, it can be kept up to date using the synchronize operation:

db_connection = DatabaseConnection(connection_params)
async with DAPClient() as session:
    sql_replicator = SQLReplicator(session, db_connection)
    await sql_replicator.version_upgrade()
    await sql_replicator.synchronize(namespace, table_name)

This inspects the information in the meta-table, and issues an incremental query to DAP API with a since timestamp corresponding to the last synchronization time. Based on the results of the incremental query, it inserts new records, updates existing records, and deletes records that have been added to, updated in, or removed from the DAP service.

If the local schema version in the meta-table is identical to the remote schema version in DAP, inserting, updating and deleting records proceeds normally. However, if there is a mismatch, the table structure of the local database has to evolve to match the current structure of the data in DAP. This includes the following schema changes in the back-end:

  • A new required (a.k.a. non-nullable) field (column) is added. The new field has a default value assigned to it in the schema.

  • A new optional (a.k.a. nullable) field (column) is added to a table.

  • A new enumeration value is added to an existing enumeration type.

  • A new enumeration type is introduced.

  • A field (column) is removed from a table.

Behind the scenes, the client library uses SQL commands such as ALTER TABLE ... ADD COLUMN ... or ALTER TYPE ... ADD VALUE ... to replicate schema changes in DAP in our local database. If the JSON schema change couldn’t be mapped to a series of these SQL statements, the client library wouldn’t be able to synchronize with DAP using incremental queries, and would have to issue an expensive snapshot query.

Once the local database table structure has been reconciled with the new schema in DAP, and the meta-table has been updated, data synchronization proceeds normally with insert, update and delete SQL statements.

Dropping data from a database

If some tables are no longer needed, these can be dropped from the database using the following code:

db_connection = DatabaseConnection(connection_params)
await SQLDrop(db_connection).drop(namespace, table_names)

Executing operations on multiple tables

Using the session operations can be executed on multiple tables (stored in table_names string) sequentially:

async with DAPClient() as session:
    sql_replicator = SQLReplicator(session, db_connection)
    await sql_replicator.version_upgrade()
    async def replicate_table_fn(namespace: str, table: str) -> None:
        await sql_replicator.initialize(namespace, table)
    await session.execute_operation_on_tables(
        namespace, table_names, "export", replicate_table_fn
    )

The table_names string can be a single table name, a comma separated list of table names or all for all tables in the namespace. The execute_operation_on_tables will be run on all tables sequentially regardless of success or failure but will raise an aggregate exception which contains all failures as sub-exceptions in case the operation failed on any individual table. In the future this may change to support running operations on tables concurrently.

Configure log level for debugging

The client library uses the Python logging module to log messages. The default log level is INFO. You can change the log level by adding the following code to the beginning of your script. It’ll also add the timestamp to each log message.

import logging

# ... other imports

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("dap")
logger.setLevel(logging.DEBUG)
logger.propagate = False
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s"))
logger.addHandler(handler)

Before we obtain data, we need to get the latest schema of a table. The following example retrieves the JSON schema of the table accounts in the namespace canvas as a JSON schema object. A JSON object is a recursive Python data structure whose outermost layer is a Python dict whose keys are strings (type str) and values are JSON objects. We can use the Python package to validate data against this JSON schema.

asynchronous programming paradigm
enter an asynchronous context
jsonschema