Commit 6a8e2e99 authored by Dimitris Lampridis's avatar Dimitris Lampridis

further cleanup and re-arrangement of files

parent 53f5b0e3
This diff is collapsed.
files = ["dummy_ctrl_regs.vhd",
"dummy_stat_regs.vhd",
"wb_addr_decoder.vhd"]
files = ["spec_gn4124_test.vhd"]
files = ["dummy_ctrl_regs.vhd",
"dummy_stat_regs.vhd",
"wb_addr_decoder.vhd"]
......@@ -11,8 +11,8 @@ syn_tool = "ise"
files = ["../spec_gn4124_test.ucf",
"../ip_cores/l2p_fifo.ngc"]
modules = { "local" : ["../rtl",
"../../common/rtl",
modules = { "local" : ["../top",
"../rtl",
"../../gn4124core/rtl"],
"git" : "git://ohwr.org/hdl-core-lib/general-cores.git::proposed_master"}
......
files = ["spec_gn4124_test.vhd"]
This diff is collapsed.
DIRS = kernel user bench doc python
all:
@for d in $(DIRS); do $(MAKE) -C $$d $@ || exit 1; done
clean:
@for d in $(DIRS); do $(MAKE) -C $$d $@ || exit 1; done
This work is part of the White Rabbit project at CERN (cern.ch).
The package includes the raw I/O driver and the GN4124 driver. The kernel
part lives in the kernel/ subdir of the package, while user-space utilities
are under user/
Documentation is under doc/ and is written in Texinfo, which though
old is simple enough to require little efforts on my side. Output is
pdf, text and info.
To compile, just "make". If you miss TeX or texinfo you won't have the
docs. If you miss the compiler or gmake you won't have anything.
To make clean just "make clean".
CFLAGS = -Wall -ggdb -I../kernel
AS = $(CROSS_COMPILE)as
LD = $(CROSS_COMPILE)ld
CC = $(CROSS_COMPILE)gcc
CPP = $(CC) -E
AR = $(CROSS_COMPILE)ar
NM = $(CROSS_COMPILE)nm
STRIP = $(CROSS_COMPILE)strip
OBJCOPY = $(CROSS_COMPILE)objcopy
OBJDUMP = $(CROSS_COMPILE)objdump
ALL = ioctl irq878 rdwr
all: $(ALL)
clean:
rm -f $(ALL) *.o *~
\ No newline at end of file
/*
* Trivial performance test for ioctl I/O
*
* Copyright (C) 2010 Alessandro Rubini <rubini@gnudd.com>
* Released according to the GNU GPL, version 2 or any later version.
*
* This work is part of the White Rabbit project and has been sponsored
* by CERN, the European Institute for Nuclear Research.
*/
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/ioctl.h>
#include "rawrabbit.h"
#define DEVNAME "/dev/rawrabbit"
#define ARRAY_SIZE(arr) (sizeof(arr) / sizeof((arr)[0]))
struct rr_iocmd iocmd = {
.address = __RR_SET_BAR(4) | 0xa08,
.datasize = 4,
};
int main(int argc, char **argv)
{
int fd, count, count0, usec;
struct timeval tv1, tv2;
if (argc != 2) {
fprintf(stderr, "%s: use \"%s <count>\"\n", argv[0], argv[0]);
exit(1);
}
count0 = count = atoi(argv[1]);
if (!count) {
fprintf(stderr, "%s: not a number \"%s\"\n", argv[0], argv[1]);
exit(1);
}
fd = open(DEVNAME, O_RDWR);
if (fd < 0) {
fprintf(stderr, "%s: %s: %s\n", argv[0], DEVNAME,
strerror(errno));
exit(1);
}
gettimeofday(&tv1, NULL);
while (count--) {
static int values[] = {
/* These make a pwm signal on the leds */
0xf000, 0xf000, 0xf000, 0xf000,
0xe000, 0xc000, 0x8000, 0x0000
};
iocmd.data32 = values[ count % ARRAY_SIZE(values) ];
if (ioctl(fd, RR_WRITE, &iocmd) < 0) {
fprintf(stderr, "%s: %s: ioctl: %s\n", argv[0], DEVNAME,
strerror(errno));
exit(1);
}
}
gettimeofday(&tv2, NULL);
usec = (tv2.tv_sec - tv1.tv_sec) * 1000 * 1000
+ tv2.tv_usec - tv1.tv_usec;
printf("%i ioctls in %i usecs\n", count0, usec);
printf("%i ioctls per second\n",
(int)(count0 * 1000LL * 1000LL / usec));
exit(0);
}
/*
* Trivial performance test for irq management
*
* Copyright (C) 2010 Alessandro Rubini <rubini@gnudd.com>
* Released according to the GNU GPL, version 2 or any later version.
*
* This work is part of the White Rabbit project and has been sponsored
* by CERN, the European Institute for Nuclear Research.
*/
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/ioctl.h>
#include "rawrabbit.h"
#define DEVNAME "/dev/rawrabbit"
#define ARRAY_SIZE(arr) (sizeof(arr) / sizeof((arr)[0]))
struct rr_devsel devsel = {
.vendor = 0x109e,
.device = 0x036e,
.subvendor = RR_DEVSEL_UNUSED,
.bus = RR_DEVSEL_UNUSED,
};
#define ENA_VAL 0x02
#define ENA_REG (__RR_SET_BAR(0) | 0x104)
#define ACK_REG (__RR_SET_BAR(0) | 0x100)
struct rr_iocmd iocmd = {
.datasize = 4,
};
int main(int argc, char **argv)
{
int fd, count, count0, nsec;
unsigned long long total = 0LL;
if (argc != 2) {
fprintf(stderr, "%s: use \"%s <count>\"\n", argv[0], argv[0]);
exit(1);
}
count0 = count = atoi(argv[1]);
if (!count) {
fprintf(stderr, "%s: not a number \"%s\"\n", argv[0], argv[1]);
exit(1);
}
fd = open(DEVNAME, O_RDWR);
if (fd < 0) {
fprintf(stderr, "%s: %s: %s\n", argv[0], DEVNAME,
strerror(errno));
exit(1);
}
/* choose the 878 device */
if (ioctl(fd, RR_DEVSEL, &devsel) < 0) {
fprintf(stderr, "%s: %s: ioctl: %s\n", argv[0], DEVNAME,
strerror(errno));
exit(1);
}
/* enable */
iocmd.address = ENA_REG;
iocmd.data32 = ENA_VAL;
if (ioctl(fd, RR_WRITE, &iocmd) < 0) {
fprintf(stderr, "%s: %s: ioctl: %s\n", argv[0], DEVNAME,
strerror(errno));
exit(1);
}
iocmd.address = ACK_REG;
while (count) {
nsec = ioctl(fd, RR_IRQWAIT);
if (nsec < 0) {
if (errno == EAGAIN) {
ioctl(fd, RR_IRQENA);
continue;
}
fprintf(stderr, "%s: %s: ioctl: %s\n", argv[0], DEVNAME,
strerror(errno));
exit(1); /* Argh! */
}
count--;
/* ack: this must work */
ioctl(fd, RR_WRITE, &iocmd);
nsec = ioctl(fd, RR_IRQENA, &iocmd);
if (nsec < 0) {
fprintf(stderr, "%s: %s: ioctl: %s\n", argv[0], DEVNAME,
strerror(errno));
/* Hmm... */
} else {
total += nsec;
}
}
/* now disable and then acknowledge */
iocmd.address = ENA_REG;
iocmd.data32 = 0;
ioctl(fd, RR_WRITE, &iocmd);
iocmd.address = ACK_REG;
iocmd.data32 = ~0;
ioctl(fd, RR_WRITE, &iocmd);
printf("got %i interrupts, average delay %lins\n", count0,
(long)(total / count0));
exit(0);
}
/*
* Trivial performance test for read(2)/write(2) I/O
*
* Copyright (C) 2010 Alessandro Rubini <rubini@gnudd.com>
* Released according to the GNU GPL, version 2 or any later version.
*
* This work is part of the White Rabbit project and has been sponsored
* by CERN, the European Institute for Nuclear Research.
*/
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>
#include <stdint.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include "rawrabbit.h"
#define DEVNAME "/dev/rawrabbit"
int main(int argc, char **argv)
{
int fd, count, count0, usec;
struct timeval tv1, tv2;
if (argc != 2) {
fprintf(stderr, "%s: use \"%s <count>\"\n", argv[0], argv[0]);
exit(1);
}
count0 = count = atoi(argv[1]);
if (!count) {
fprintf(stderr, "%s: not a number \"%s\"\n", argv[0], argv[1]);
exit(1);
}
fd = open(DEVNAME, O_RDWR);
if (fd < 0) {
fprintf(stderr, "%s: %s: %s\n", argv[0], DEVNAME,
strerror(errno));
exit(1);
}
/* write */
lseek(fd, __RR_SET_BAR(4) | 0xa08, SEEK_SET);
gettimeofday(&tv1, NULL);
while (count--) {
static uint32_t values[] = {0x0000, 0xf000};
write(fd, values + (count & 1), sizeof(values[0]));
lseek(fd, -sizeof(values[0]), SEEK_CUR);
}
gettimeofday(&tv2, NULL);
usec = (tv2.tv_sec - tv1.tv_sec) * 1000 * 1000
+ tv2.tv_usec - tv1.tv_usec;
printf("%i writes in %i usecs\n", count0, usec);
printf("%i writes per second\n",
(int)(count0 * 1000LL * 1000LL / usec));
/* read: we cut and paste the code, oh so lazy */
count = count0;
lseek(fd, __RR_SET_BAR(4) | 0xa08, SEEK_SET);
gettimeofday(&tv1, NULL);
while (count--) {
static uint32_t value;
read(fd, &value, sizeof(value));
lseek(fd, -sizeof(value), SEEK_CUR);
}
gettimeofday(&tv2, NULL);
usec = (tv2.tv_sec - tv1.tv_sec) * 1000 * 1000
+ tv2.tv_usec - tv1.tv_usec;
printf("%i reads in %i usecs\n", count0, usec);
printf("%i reads per second\n",
(int)(count0 * 1000LL * 1000LL / usec));
exit(0);
}
#
# Makefile for the documentation directory
#
# Copyright 1994,2000,2010 Alessandro Rubini <rubini@linux.it>
#
#################
#
# BE CAREFUL in editing:
# due to the large number of index files, and my use of a non standard
# info input file, any file $(TARGET).* is removed by "make clean"
#
# I chose to use a prefix for the input file ("doc.$(TARGET)"), to ease
# makeing clean and applying my own rules.
#
###################################################################
TARGET = gnurabbit
# Assume makeinfo can do images and --html.
# In any case, MAKEINFO can be specified on the commandline
MAKEINFO = makeinfo
##############################################
INPUT = $(wildcard *.in)
TEXI = $(INPUT:.in=.texi)
.SUFFIXES: .in .texi .info .html .txt
.in.texi:
@rm -f $@ 2> /dev/null
sed -f ./infofilter $< > $@
chmod -w $@
# unfortuantely implicit rules are not concatenated, so force a make run
%.pdf: %.texi $(TEXI)
$(MAKE) $(TEXI)
texi2pdf --batch $<
%.info: %.texi $(TEXI)
$(MAKE) $(TEXI)
$(MAKEINFO) $< -o $@
%.html: %.texi $(TEXI)
$(MAKE) $(TEXI)
$(MAKEINFO) --html --no-split -o $@ $<
%.txt: %.texi $(TEXI)
$(MAKE) $(TEXI)
$(MAKEINFO) --no-headers $< > $@
##############################################
ALL = $(TARGET).info $(TARGET).txt $(TARGET).html $(TARGET).pdf
all: images $(TEXI) $(ALL)
images::
if [ -d images ]; then $(MAKE) -C images || exit 1; fi
info: $(TARGET).info
check: _err.ps
gs -sDEVICE=linux -r320x200x16 $<
terse:
for n in cp fn ky pg toc tp vr; do \
rm -f $(TARGET).$$n; \
done
rm -f *~
clean: terse
rm -f $(ALL) $(TEXI)
install:
@xrdef{Top-title}{Introduction}
@xrdef{Top-snt}{}
@xrdef{Driver for GN4124-title}{Driver for GN4124}
@xrdef{Driver for GN4124-snt}{Chapter@tie 1}
@xrdef{Raw PCI I/O-title}{Raw PCI I/O}
@xrdef{Raw PCI I/O-snt}{Chapter@tie 2}
@xrdef{General features of rawrabbit-title}{General features of rawrabbit}
@xrdef{General features of rawrabbit-snt}{Section@tie 2.1}
@xrdef{Top-pg}{1}
@xrdef{Driver for GN4124-pg}{1}
@xrdef{Raw PCI I/O-pg}{1}
@xrdef{General features of rawrabbit-pg}{1}
@xrdef{Interrupt management-title}{Interrupt management}
@xrdef{Interrupt management-snt}{Section@tie 2.2}
@xrdef{Bugs and misfeatures-title}{Bugs and misfeatures}
@xrdef{Bugs and misfeatures-snt}{Section@tie 2.3}
@xrdef{The DMA buffer-title}{The DMA buffer}
@xrdef{The DMA buffer-snt}{Section@tie 2.4}
@xrdef{Interrupt management-pg}{2}
@xrdef{Bugs and misfeatures-pg}{2}
@xrdef{The DMA buffer-pg}{2}
@xrdef{System calls implemented-title}{System calls implemented}
@xrdef{System calls implemented-snt}{Section@tie 2.5}
@xrdef{System calls implemented-pg}{3}
@xrdef{Ioctl commands-title}{Ioctl commands}
@xrdef{Ioctl commands-snt}{Section@tie 2.6}
@xrdef{Ioctl commands-pg}{4}
@xrdef{User space demo programs-title}{User space demo programs}
@xrdef{User space demo programs-snt}{Section@tie 2.7}
@xrdef{rrcmd-title}{rrcmd}
@xrdef{rrcmd-snt}{Section@tie 2.7.1}
@xrdef{User space demo programs-pg}{5}
@xrdef{rrcmd-pg}{5}
@xrdef{User space benchmarks-title}{User space benchmarks}
@xrdef{User space benchmarks-snt}{Section@tie 2.8}
@xrdef{bench/ioctl-title}{bench/ioctl}
@xrdef{bench/ioctl-snt}{Section@tie 2.8.1}
@xrdef{bench/irq878-title}{bench/irq878}
@xrdef{bench/irq878-snt}{Section@tie 2.8.2}
@xrdef{Benchmarking read and write-title}{Benchmarking read and write}
@xrdef{Benchmarking read and write-snt}{Section@tie 2.8.3}
@xrdef{User space benchmarks-pg}{7}
@xrdef{bench/ioctl-pg}{7}
@xrdef{bench/irq878-pg}{7}
@xrdef{Benchmarking read and write-pg}{7}
This diff is collapsed.
This is pdfTeX, Version 3.1415926-1.40.10 (TeX Live 2009/Debian) (format=pdfetex 2010.5.7) 26 AUG 2010 08:59
entering extended mode
restricted \write18 enabled.
file:line:error style messages enabled.
%&-line parsing enabled.
**\nonstopmode \catcode126=12 \def\normaltilde{~}\catcode126=13 \let~\normaltil
de \input ./gnurabbit.texi
(./gnurabbit.texi)
! Emergency stop.
<*> ... \let~\normaltilde \input ./gnurabbit.texi
*** (job aborted, no legal \end found)
! ==> Fatal error occurred, no output PDF file produced!
#! /usr/bin/sed -f
# allow "%" as a comment char, but only at the beginning of the line
s/^%/@c /
#s/[^\\]%.*$//
s/^\\%/%/
# turn accents into tex encoding (for Italian language)
s/a`//g
s/e`//g
s/E`//g
s/i`//g
s/o`//g
s/u`//g
s/erch/erch/g
s/oich/oich/g
s/n/n/g
s//@`a/g
s//@`e/g
s//@'e/g
s//@`i/g
s//@`o/g
s//@`E/g
s//@`u/g
#preserve blanks, braces and @ in @example and @smallexample blocks
/@example/,/@end example/ s/{/@{/g
/@example/,/@end example/ s/}/@}/g
/@example/,/@end example/ s/@/@@/g
s/^@@example/@example/
s/^@@end/@end/
/@example/,/@end example/ p
/@example/,/@end example/ d
/@smallexample/,/@end smallexample/ s/{/@{/g
/@smallexample/,/@end smallexample/ s/}/@}/g
/@smallexample/,/@end smallexample/ s/@/@@/g
s/^@@smallexample/@smallexample/
s/^@@end/@end/
/@smallexample/,/@end smallexample/ p
/@smallexample/,/@end smallexample/ d
# remove leading blanks
s/^[ ]*//
# fix include to include texi not in
s/^\(.include.*\).in$/\1.texi/
#LINUX ?= /lib/modules/$(shell uname -r)/build
CPU=L865
KVER=2.6.24.7-rt27
LINUX=/acc/sys/$(CPU)/usr/src/kernels/$(KVER)/
obj-m = rawrabbit.o
all modules:
$(MAKE) -C $(LINUX) M=$(shell /bin/pwd) modules
clean:
rm -rf *.o *~ .*.cmd *.ko *.mod.c .tmp_versions Module.symvers \
Module.markers modules.order
/* Simple compatibility macros */
#ifdef CONFIG_X86
/* Readq for IA32 introduced in 2.6.28-rc7 */
#ifndef readq
static inline __u64 readq(const volatile void __iomem *addr)
{
const volatile u32 __iomem *p = addr;
u32 low, high;
low = readl(p);
high = readl(p + 1);
return low + ((u64)high << 32);
}
static inline void writeq(__u64 val, volatile void __iomem *addr)
{
writel(val, addr);
writel(val >> 32, addr+4);
}
#endif
#endif /* X86 */
/* Hack... something I sometimes need */
static inline void dumpstruct(char *name, void *ptr, int size)
{
int i;
unsigned char *p = ptr;
printk("dump %s at %p (size 0x%x)\n", name, ptr, size);
for (i = 0; i < size; ) {
printk("%02x", p[i]);
i++;
printk(i & 3 ? " " : i & 0xf ? " " : "\n");
}
if (i & 0xf)
printk("\n");
}
This diff is collapsed.
/*
* Public header for the raw I/O interface for PCI or PCI express interfaces
*
* Copyright (C) 2010 Alessandro Rubini <rubini@gnudd.com>
* Released according to the GNU GPL, version 2 or any later version.
*
* This work is part of the White Rabbit project and has been sponsored
* by CERN, the European Institute for Nuclear Research.
*/
#ifndef __RAWRABBIT_H__
#define __RAWRABBIT_H__
#include <linux/types.h>
#include <linux/ioctl.h>
#ifdef __KERNEL__ /* The initial part of the file is driver-internal stuff */
#include <linux/pci.h>
#include <linux/spinlock.h>
#include <linux/completion.h>
#include <linux/wait.h>
struct rr_devsel;
struct rr_dev {
struct rr_devsel *devsel;
struct pci_driver *pci_driver;
struct pci_device_id *id_table;
struct pci_dev *pdev; /* non-null after pciprobe */
spinlock_t lock;
wait_queue_head_t q;
void *dmabuf;
struct timespec irqtime;
unsigned long irqcount;
struct completion complete;
struct resource *area[3]; /* bar 0, 2, 4 */
void *remap[3]; /* ioremap of bar 0, 2, 4 */
unsigned long flags;
int usecount;
};
#define RR_FLAG_REGISTERED 0x00000001
#define RR_FLAG_IRQDISABLE 0x00000002
#define RR_FLAG_IRQREQUEST 0x00000002
#define RR_PROBE_TIMEOUT (HZ/10) /* for pci_register_drv */
#endif /* __KERNEL__ */
/* By default, the driver registers for this vendor/devid */
#define RR_DEFAULT_VENDOR 0x1a39
#define RR_DEFAULT_DEVICE 0x0004
#define RR_DEFAULT_BUFSIZE (1<<20) /* 1MB */
#define RR_PLIST_SIZE 4096 /* no PAGE_SIZE in user space */
#define RR_PLIST_LEN (RR_PLIST_SIZE / sizeof(void *))
#define RR_MAX_BUFSIZE (RR_PLIST_SIZE * RR_PLIST_LEN)
/* This structure is used to select the device to be accessed, via ioctl */
struct rr_devsel {
__u16 vendor;
__u16 device;
__u16 subvendor; /* RR_DEVSEL_UNUSED to ignore subvendor/dev */
__u16 subdevice;
__u16 bus; /* RR_DEVSEL_UNUSED to ignore bus and devfn */
__u16 devfn;
};
#define RR_DEVSEL_UNUSED 0xffff
/* Offsets for BAR areas in llseek() and/or ioctl */
#define RR_BAR_0 0x00000000
#define RR_BAR_2 0x20000000
#define RR_BAR_4 0x40000000
#define RR_BAR_BUF 0xc0000000 /* The DMA buffer */
#define RR_IS_DMABUF(addr) ((addr) >= RR_BAR_BUF)
#define __RR_GET_BAR(x) ((x) >> 28)
#define __RR_SET_BAR(x) ((x) << 28)
#define __RR_GET_OFF(x) ((x) & 0x0fffffff)
static inline int rr_is_valid_bar(unsigned long address)
{
int bar = __RR_GET_BAR(address);
return bar == 0 || bar == 2 || bar == 4 || bar == 0x0c;
}
static inline int rr_is_dmabuf_bar(unsigned long address)
{
int bar = __RR_GET_BAR(address);
return bar == 0x0c;
}
struct rr_iocmd {
__u32 address; /* bar and offset */
__u32 datasize; /* 1 or 2 or 4 or 8 */
union {
__u8 data8;
__u16 data16;
__u32 data32;
__u64 data64;
};
};
/* ioctl commands */
#define __RR_IOC_MAGIC '4' /* random or so */
#define RR_DEVSEL _IOW(__RR_IOC_MAGIC, 0, struct rr_devsel)
#define RR_DEVGET _IOR(__RR_IOC_MAGIC, 1, struct rr_devsel)
#define RR_READ _IOWR(__RR_IOC_MAGIC, 2, struct rr_iocmd)
#define RR_WRITE _IOW(__RR_IOC_MAGIC, 3, struct rr_iocmd)
#define RR_IRQWAIT _IO(__RR_IOC_MAGIC, 4)
#define RR_IRQENA _IO(__RR_IOC_MAGIC, 5)
#define RR_GETDMASIZE _IO(__RR_IOC_MAGIC, 6)
/* #define RR_SETDMASIZE _IO(__RR_IOC_MAGIC, 7, unsigned long) */
#define RR_GETPLIST _IO(__RR_IOC_MAGIC, 8) /* returns a whole page */
#define VFAT_IOCTL_READDIR_BOTH _IOR('r', 1, struct dirent [2])
#endif /* __RAWRABBIT_H__ */
CFLAGS = -Wall -ggdb -I../kernel
AS = $(CROSS_COMPILE)as
LD = $(CROSS_COMPILE)ld
CC = $(CROSS_COMPILE)gcc
CPP = $(CC) -E
AR = $(CROSS_COMPILE)ar
NM = $(CROSS_COMPILE)nm
STRIP = $(CROSS_COMPILE)strip
OBJCOPY = $(CROSS_COMPILE)objcopy
OBJDUMP = $(CROSS_COMPILE)objdump
ALL = rrlib.so
all: $(ALL)
rrlib.so: rrlib.o
$(CC) $(CFLAGS) -o $@ -shared $^
clean:
rm -f $(ALL) *.o *~ *.so *.pyc
This diff is collapsed.
######################################################################
# This file should be kept compatible with Python 2.3, see PEP 291. #
######################################################################
import sys
from ctypes import *
_array_type = type(c_int * 3)
def _other_endian(typ):
"""Return the type with the 'other' byte order. Simple types like
c_int and so on already have __ctype_be__ and __ctype_le__
attributes which contain the types, for more complicated types
only arrays are supported.
"""
try:
return getattr(typ, _OTHER_ENDIAN)
except AttributeError:
if type(typ) == _array_type:
return _other_endian(typ._type_) * typ._length_
raise TypeError("This type does not support other endian: %s" % typ)
class _swapped_meta(type(Structure)):
def __setattr__(self, attrname, value):
if attrname == "_fields_":
fields = []
for desc in value:
name = desc[0]
typ = desc[1]
rest = desc[2:]
fields.append((name, _other_endian(typ)) + rest)
value = fields
super(_swapped_meta, self).__setattr__(attrname, value)
################################################################
# Note: The Structure metaclass checks for the *presence* (not the
# value!) of a _swapped_bytes_ attribute to determine the bit order in
# structures containing bit fields.
if sys.byteorder == "little":
_OTHER_ENDIAN = "__ctype_be__"
LittleEndianStructure = Structure
class BigEndianStructure(Structure):
"""Structure with big endian byte order"""
__metaclass__ = _swapped_meta
_swappedbytes_ = None
elif sys.byteorder == "big":
_OTHER_ENDIAN = "__ctype_le__"
BigEndianStructure = Structure
class LittleEndianStructure(Structure):
"""Structure with little endian byte order"""
__metaclass__ = _swapped_meta
_swappedbytes_ = None
else:
raise RuntimeError("Invalid byteorder")
######################################################################
# This file should be kept compatible with Python 2.3, see PEP 291. #
######################################################################
"""
Enough Mach-O to make your head spin.
See the relevant header files in /usr/include/mach-o
And also Apple's documentation.
"""
__version__ = '1.0'
######################################################################
# This file should be kept compatible with Python 2.3, see PEP 291. #
######################################################################
"""
dyld emulation
"""
import os
from framework import framework_info
from dylib import dylib_info
from itertools import *
__all__ = [
'dyld_find', 'framework_find',
'framework_info', 'dylib_info',
]
# These are the defaults as per man dyld(1)
#
DEFAULT_FRAMEWORK_FALLBACK = [
os.path.expanduser("~/Library/Frameworks"),
"/Library/Frameworks",
"/Network/Library/Frameworks",
"/System/Library/Frameworks",
]
DEFAULT_LIBRARY_FALLBACK = [
os.path.expanduser("~/lib"),
"/usr/local/lib",
"/lib",
"/usr/lib",
]
def ensure_utf8(s):
"""Not all of PyObjC and Python understand unicode paths very well yet"""
if isinstance(s, unicode):
return s.encode('utf8')
return s
def dyld_env(env, var):
if env is None:
env = os.environ
rval = env.get(var)
if rval is None:
return []
return rval.split(':')
def dyld_image_suffix(env=None):
if env is None:
env = os.environ
return env.get('DYLD_IMAGE_SUFFIX')
def dyld_framework_path(env=None):
return dyld_env(env, 'DYLD_FRAMEWORK_PATH')
def dyld_library_path(env=None):
return dyld_env(env, 'DYLD_LIBRARY_PATH')
def dyld_fallback_framework_path(env=None):
return dyld_env(env, 'DYLD_FALLBACK_FRAMEWORK_PATH')
def dyld_fallback_library_path(env=None):
return dyld_env(env, 'DYLD_FALLBACK_LIBRARY_PATH')
def dyld_image_suffix_search(iterator, env=None):
"""For a potential path iterator, add DYLD_IMAGE_SUFFIX semantics"""
suffix = dyld_image_suffix(env)
if suffix is None:
return iterator
def _inject(iterator=iterator, suffix=suffix):
for path in iterator:
if path.endswith('.dylib'):
yield path[:-len('.dylib')] + suffix + '.dylib'
else:
yield path + suffix
yield path
return _inject()
def dyld_override_search(name, env=None):
# If DYLD_FRAMEWORK_PATH is set and this dylib_name is a
# framework name, use the first file that exists in the framework
# path if any. If there is none go on to search the DYLD_LIBRARY_PATH
# if any.
framework = framework_info(name)
if framework is not None:
for path in dyld_framework_path(env):
yield os.path.join(path, framework['name'])
# If DYLD_LIBRARY_PATH is set then use the first file that exists
# in the path. If none use the original name.
for path in dyld_library_path(env):
yield os.path.join(path, os.path.basename(name))
def dyld_executable_path_search(name, executable_path=None):
# If we haven't done any searching and found a library and the
# dylib_name starts with "@executable_path/" then construct the
# library name.
if name.startswith('@executable_path/') and executable_path is not None:
yield os.path.join(executable_path, name[len('@executable_path/'):])
def dyld_default_search(name, env=None):
yield name
framework = framework_info(name)
if framework is not None:
fallback_framework_path = dyld_fallback_framework_path(env)
for path in fallback_framework_path:
yield os.path.join(path, framework['name'])
fallback_library_path = dyld_fallback_library_path(env)
for path in fallback_library_path:
yield os.path.join(path, os.path.basename(name))
if framework is not None and not fallback_framework_path:
for path in DEFAULT_FRAMEWORK_FALLBACK:
yield os.path.join(path, framework['name'])
if not fallback_library_path:
for path in DEFAULT_LIBRARY_FALLBACK:
yield os.path.join(path, os.path.basename(name))
def dyld_find(name, executable_path=None, env=None):
"""
Find a library or framework using dyld semantics
"""
name = ensure_utf8(name)
executable_path = ensure_utf8(executable_path)
for path in dyld_image_suffix_search(chain(
dyld_override_search(name, env),
dyld_executable_path_search(name, executable_path),
dyld_default_search(name, env),
), env):
if os.path.isfile(path):
return path
raise ValueError, "dylib %s could not be found" % (name,)
def framework_find(fn, executable_path=None, env=None):
"""
Find a framework using dyld semantics in a very loose manner.
Will take input such as:
Python
Python.framework
Python.framework/Versions/Current
"""
try:
return dyld_find(fn, executable_path=executable_path, env=env)
except ValueError, e:
pass
fmwk_index = fn.rfind('.framework')
if fmwk_index == -1:
fmwk_index = len(fn)
fn += '.framework'
fn = os.path.join(fn, os.path.basename(fn[:fmwk_index]))
try:
return dyld_find(fn, executable_path=executable_path, env=env)
except ValueError:
raise e
def test_dyld_find():
env = {}
assert dyld_find('libSystem.dylib') == '/usr/lib/libSystem.dylib'
assert dyld_find('System.framework/System') == '/System/Library/Frameworks/System.framework/System'
if __name__ == '__main__':
test_dyld_find()
######################################################################
# This file should be kept compatible with Python 2.3, see PEP 291. #
######################################################################
"""
Generic dylib path manipulation
"""
import re
__all__ = ['dylib_info']
DYLIB_RE = re.compile(r"""(?x)
(?P<location>^.*)(?:^|/)
(?P<name>
(?P<shortname>\w+?)
(?:\.(?P<version>[^._]+))?
(?:_(?P<suffix>[^._]+))?
\.dylib$
)
""")
def dylib_info(filename):
"""
A dylib name can take one of the following four forms:
Location/Name.SomeVersion_Suffix.dylib
Location/Name.SomeVersion.dylib
Location/Name_Suffix.dylib
Location/Name.dylib
returns None if not found or a mapping equivalent to:
dict(
location='Location',
name='Name.SomeVersion_Suffix.dylib',
shortname='Name',
version='SomeVersion',
suffix='Suffix',
)
Note that SomeVersion and Suffix are optional and may be None
if not present.
"""
is_dylib = DYLIB_RE.match(filename)
if not is_dylib:
return None
return is_dylib.groupdict()
def test_dylib_info():
def d(location=None, name=None, shortname=None, version=None, suffix=None):
return dict(
location=location,
name=name,
shortname=shortname,
version=version,
suffix=suffix
)
assert dylib_info('completely/invalid') is None
assert dylib_info('completely/invalide_debug') is None
assert dylib_info('P/Foo.dylib') == d('P', 'Foo.dylib', 'Foo')
assert dylib_info('P/Foo_debug.dylib') == d('P', 'Foo_debug.dylib', 'Foo', suffix='debug')
assert dylib_info('P/Foo.A.dylib') == d('P', 'Foo.A.dylib', 'Foo', 'A')
assert dylib_info('P/Foo_debug.A.dylib') == d('P', 'Foo_debug.A.dylib', 'Foo_debug', 'A')
assert dylib_info('P/Foo.A_debug.dylib') == d('P', 'Foo.A_debug.dylib', 'Foo', 'A', 'debug')
if __name__ == '__main__':
test_dylib_info()
######################################################################
# This file should be kept compatible with Python 2.3, see PEP 291. #
######################################################################
"""
Generic framework path manipulation
"""
import re
__all__ = ['framework_info']
STRICT_FRAMEWORK_RE = re.compile(r"""(?x)
(?P<location>^.*)(?:^|/)
(?P<name>
(?P<shortname>\w+).framework/
(?:Versions/(?P<version>[^/]+)/)?
(?P=shortname)
(?:_(?P<suffix>[^_]+))?
)$
""")
def framework_info(filename):
"""
A framework name can take one of the following four forms:
Location/Name.framework/Versions/SomeVersion/Name_Suffix
Location/Name.framework/Versions/SomeVersion/Name
Location/Name.framework/Name_Suffix
Location/Name.framework/Name
returns None if not found, or a mapping equivalent to:
dict(
location='Location',
name='Name.framework/Versions/SomeVersion/Name_Suffix',
shortname='Name',
version='SomeVersion',
suffix='Suffix',
)
Note that SomeVersion and Suffix are optional and may be None
if not present
"""
is_framework = STRICT_FRAMEWORK_RE.match(filename)
if not is_framework:
return None
return is_framework.groupdict()
def test_framework_info():
def d(location=None, name=None, shortname=None, version=None, suffix=None):
return dict(
location=location,
name=name,
shortname=shortname,
version=version,
suffix=suffix
)
assert framework_info('completely/invalid') is None
assert framework_info('completely/invalid/_debug') is None
assert framework_info('P/F.framework') is None
assert framework_info('P/F.framework/_debug') is None
assert framework_info('P/F.framework/F') == d('P', 'F.framework/F', 'F')
assert framework_info('P/F.framework/F_debug') == d('P', 'F.framework/F_debug', 'F', suffix='debug')
assert framework_info('P/F.framework/Versions') is None
assert framework_info('P/F.framework/Versions/A') is None
assert framework_info('P/F.framework/Versions/A/F') == d('P', 'F.framework/Versions/A/F', 'F', 'A')
assert framework_info('P/F.framework/Versions/A/F_debug') == d('P', 'F.framework/Versions/A/F_debug', 'F', 'A', 'debug')
if __name__ == '__main__':
test_framework_info()
import glob, os, sys, unittest, getopt, time
use_resources = []
class ResourceDenied(Exception):
"""Test skipped because it requested a disallowed resource.
This is raised when a test calls requires() for a resource that
has not be enabled. Resources are defined by test modules.
"""
def is_resource_enabled(resource):
"""Test whether a resource is enabled.
If the caller's module is __main__ then automatically return True."""
if sys._getframe().f_back.f_globals.get("__name__") == "__main__":
return True
result = use_resources is not None and \
(resource in use_resources or "*" in use_resources)
if not result:
_unavail[resource] = None
return result
_unavail = {}
def requires(resource, msg=None):
"""Raise ResourceDenied if the specified resource is not available.
If the caller's module is __main__ then automatically return True."""
# see if the caller's module is __main__ - if so, treat as if
# the resource was set
if sys._getframe().f_back.f_globals.get("__name__") == "__main__":
return
if not is_resource_enabled(resource):
if msg is None:
msg = "Use of the `%s' resource not enabled" % resource
raise ResourceDenied(msg)
def find_package_modules(package, mask):
import fnmatch
if hasattr(package, "__loader__"):
path = package.__name__.replace(".", os.path.sep)
mask = os.path.join(path, mask)
for fnm in package.__loader__._files.iterkeys():
if fnmatch.fnmatchcase(fnm, mask):
yield os.path.splitext(fnm)[0].replace(os.path.sep, ".")
else:
path = package.__path__[0]
for fnm in os.listdir(path):
if fnmatch.fnmatchcase(fnm, mask):
yield "%s.%s" % (package.__name__, os.path.splitext(fnm)[0])
def get_tests(package, mask, verbosity):
"""Return a list of skipped test modules, and a list of test cases."""
tests = []
skipped = []
for modname in find_package_modules(package, mask):
try:
mod = __import__(modname, globals(), locals(), ['*'])
except ResourceDenied, detail:
skipped.append(modname)
if verbosity > 1:
print >> sys.stderr, "Skipped %s: %s" % (modname, detail)
continue
except Exception, detail:
print >> sys.stderr, "Warning: could not import %s: %s" % (modname, detail)
continue
for name in dir(mod):
if name.startswith("_"):
continue
o = getattr(mod, name)
if type(o) is type(unittest.TestCase) and issubclass(o, unittest.TestCase):
tests.append(o)
return skipped, tests
def usage():
print __doc__
return 1
def test_with_refcounts(runner, verbosity, testcase):
"""Run testcase several times, tracking reference counts."""
import gc
import ctypes
ptc = ctypes._pointer_type_cache.copy()
cfc = ctypes._c_functype_cache.copy()
wfc = ctypes._win_functype_cache.copy()
# when searching for refcount leaks, we have to manually reset any
# caches that ctypes has.
def cleanup():
ctypes._pointer_type_cache = ptc.copy()
ctypes._c_functype_cache = cfc.copy()
ctypes._win_functype_cache = wfc.copy()
gc.collect()
test = unittest.makeSuite(testcase)
for i in range(5):
rc = sys.gettotalrefcount()
runner.run(test)
cleanup()
COUNT = 5
refcounts = [None] * COUNT
for i in range(COUNT):
rc = sys.gettotalrefcount()
runner.run(test)
cleanup()
refcounts[i] = sys.gettotalrefcount() - rc
if filter(None, refcounts):
print "%s leaks:\n\t" % testcase, refcounts
elif verbosity:
print "%s: ok." % testcase
class TestRunner(unittest.TextTestRunner):
def run(self, test, skipped):
"Run the given test case or test suite."
# Same as unittest.TextTestRunner.run, except that it reports
# skipped tests.
result = self._makeResult()
startTime = time.time()
test(result)
stopTime = time.time()
timeTaken = stopTime - startTime
result.printErrors()
self.stream.writeln(result.separator2)
run = result.testsRun
if _unavail: #skipped:
requested = _unavail.keys()
requested.sort()
self.stream.writeln("Ran %d test%s in %.3fs (%s module%s skipped)" %
(run, run != 1 and "s" or "", timeTaken,
len(skipped),
len(skipped) != 1 and "s" or ""))
self.stream.writeln("Unavailable resources: %s" % ", ".join(requested))
else:
self.stream.writeln("Ran %d test%s in %.3fs" %
(run, run != 1 and "s" or "", timeTaken))
self.stream.writeln()
if not result.wasSuccessful():
self.stream.write("FAILED (")
failed, errored = map(len, (result.failures, result.errors))
if failed:
self.stream.write("failures=%d" % failed)
if errored:
if failed: self.stream.write(", ")
self.stream.write("errors=%d" % errored)
self.stream.writeln(")")
else:
self.stream.writeln("OK")
return result
def main(*packages):
try:
opts, args = getopt.getopt(sys.argv[1:], "rqvu:")
except getopt.error:
return usage()
verbosity = 1
search_leaks = False
for flag, value in opts:
if flag == "-q":
verbosity -= 1
elif flag == "-v":
verbosity += 1
elif flag == "-r":
try:
sys.gettotalrefcount
except AttributeError:
print >> sys.stderr, "-r flag requires Python debug build"
return -1
search_leaks = True
elif flag == "-u":
use_resources.extend(value.split(","))
mask = "test_*.py"
if args:
mask = args[0]
for package in packages:
run_tests(package, mask, verbosity, search_leaks)
def run_tests(package, mask, verbosity, search_leaks):
skipped, testcases = get_tests(package, mask, verbosity)
runner = TestRunner(verbosity=verbosity)
suites = [unittest.makeSuite(o) for o in testcases]
suite = unittest.TestSuite(suites)
result = runner.run(suite, skipped)
if search_leaks:
# hunt for refcount leaks
runner = BasicTestRunner()
for t in testcases:
test_with_refcounts(runner, verbosity, t)
return bool(result.errors)
class BasicTestRunner:
def run(self, test):
result = unittest.TestResult()
test(result)
return result
"""Usage: runtests.py [-q] [-r] [-v] [-u resources] [mask]
Run all tests found in this directory, and print a summary of the results.
Command line flags:
-q quiet mode: don't prnt anything while the tests are running
-r run tests repeatedly, look for refcount leaks
-u<resources>
Add resources to the lits of allowed resources. '*' allows all
resources.
-v verbose mode: print the test currently executed
mask mask to select filenames containing testcases, wildcards allowed
"""
import sys
import ctypes.test
if __name__ == "__main__":
sys.exit(ctypes.test.main(ctypes.test))
import unittest
from ctypes import *
class AnonTest(unittest.TestCase):
def test_anon(self):
class ANON(Union):
_fields_ = [("a", c_int),
("b", c_int)]
class Y(Structure):
_fields_ = [("x", c_int),
("_", ANON),
("y", c_int)]
_anonymous_ = ["_"]
self.failUnlessEqual(Y.a.offset, sizeof(c_int))
self.failUnlessEqual(Y.b.offset, sizeof(c_int))
self.failUnlessEqual(ANON.a.offset, 0)
self.failUnlessEqual(ANON.b.offset, 0)
def test_anon_nonseq(self):
# TypeError: _anonymous_ must be a sequence
self.failUnlessRaises(TypeError,
lambda: type(Structure)("Name",
(Structure,),
{"_fields_": [], "_anonymous_": 42}))
def test_anon_nonmember(self):
# AttributeError: type object 'Name' has no attribute 'x'
self.failUnlessRaises(AttributeError,
lambda: type(Structure)("Name",
(Structure,),
{"_fields_": [],
"_anonymous_": ["x"]}))
def test_nested(self):
class ANON_S(Structure):
_fields_ = [("a", c_int)]
class ANON_U(Union):
_fields_ = [("_", ANON_S),
("b", c_int)]
_anonymous_ = ["_"]
class Y(Structure):
_fields_ = [("x", c_int),
("_", ANON_U),
("y", c_int)]
_anonymous_ = ["_"]
self.failUnlessEqual(Y.x.offset, 0)
self.failUnlessEqual(Y.a.offset, sizeof(c_int))
self.failUnlessEqual(Y.b.offset, sizeof(c_int))
self.failUnlessEqual(Y._.offset, sizeof(c_int))
self.failUnlessEqual(Y.y.offset, sizeof(c_int) * 2)
if __name__ == "__main__":
unittest.main()
import unittest
from ctypes import *
from binascii import hexlify
import re
def dump(obj):
# helper function to dump memory contents in hex, with a hyphen
# between the bytes.
h = hexlify(buffer(obj))
return re.sub(r"(..)", r"\1-", h)[:-1]
class Value(Structure):
_fields_ = [("val", c_byte)]
class Container(Structure):
_fields_ = [("pvalues", POINTER(Value))]
class Test(unittest.TestCase):
def test(self):
# create an array of 4 values
val_array = (Value * 4)()
# create a container, which holds a pointer to the pvalues array.
c = Container()
c.pvalues = val_array
# memory contains 4 NUL bytes now, that's correct
self.failUnlessEqual("00-00-00-00", dump(val_array))
# set the values of the array through the pointer:
for i in range(4):
c.pvalues[i].val = i + 1
values = [c.pvalues[i].val for i in range(4)]
# These are the expected results: here s the bug!
self.failUnlessEqual(
(values, dump(val_array)),
([1, 2, 3, 4], "01-02-03-04")
)
def test_2(self):
val_array = (Value * 4)()
# memory contains 4 NUL bytes now, that's correct
self.failUnlessEqual("00-00-00-00", dump(val_array))
ptr = cast(val_array, POINTER(Value))
# set the values of the array through the pointer:
for i in range(4):
ptr[i].val = i + 1
values = [ptr[i].val for i in range(4)]
# These are the expected results: here s the bug!
self.failUnlessEqual(
(values, dump(val_array)),
([1, 2, 3, 4], "01-02-03-04")
)
if __name__ == "__main__":
unittest.main()
import unittest
from ctypes import *
formats = "bBhHiIlLqQfd"
formats = c_byte, c_ubyte, c_short, c_ushort, c_int, c_uint, \
c_long, c_ulonglong, c_float, c_double
class ArrayTestCase(unittest.TestCase):
def test_simple(self):
# create classes holding simple numeric types, and check
# various properties.
init = range(15, 25)
for fmt in formats:
alen = len(init)
int_array = ARRAY(fmt, alen)
ia = int_array(*init)
# length of instance ok?
self.failUnlessEqual(len(ia), alen)
# slot values ok?
values = [ia[i] for i in range(len(init))]
self.failUnlessEqual(values, init)
# change the items
from operator import setitem
new_values = range(42, 42+alen)
[setitem(ia, n, new_values[n]) for n in range(alen)]
values = [ia[i] for i in range(len(init))]
self.failUnlessEqual(values, new_values)
# are the items initialized to 0?
ia = int_array()
values = [ia[i] for i in range(len(init))]
self.failUnlessEqual(values, [0] * len(init))
# Too many in itializers should be caught
self.assertRaises(IndexError, int_array, *range(alen*2))
CharArray = ARRAY(c_char, 3)
ca = CharArray("a", "b", "c")
# Should this work? It doesn't:
# CharArray("abc")
self.assertRaises(TypeError, CharArray, "abc")
self.failUnlessEqual(ca[0], "a")
self.failUnlessEqual(ca[1], "b")
self.failUnlessEqual(ca[2], "c")
self.failUnlessEqual(ca[-3], "a")
self.failUnlessEqual(ca[-2], "b")
self.failUnlessEqual(ca[-1], "c")
self.failUnlessEqual(len(ca), 3)
# slicing is now supported, but not extended slicing (3-argument)!
from operator import getslice, delitem
self.assertRaises(TypeError, getslice, ca, 0, 1, -1)
# cannot delete items
self.assertRaises(TypeError, delitem, ca, 0)
def test_numeric_arrays(self):
alen = 5
numarray = ARRAY(c_int, alen)
na = numarray()
values = [na[i] for i in range(alen)]
self.failUnlessEqual(values, [0] * alen)
na = numarray(*[c_int()] * alen)
values = [na[i] for i in range(alen)]
self.failUnlessEqual(values, [0]*alen)
na = numarray(1, 2, 3, 4, 5)
values = [i for i in na]
self.failUnlessEqual(values, [1, 2, 3, 4, 5])
na = numarray(*map(c_int, (1, 2, 3, 4, 5)))
values = [i for i in na]
self.failUnlessEqual(values, [1, 2, 3, 4, 5])
def test_classcache(self):
self.failUnless(not ARRAY(c_int, 3) is ARRAY(c_int, 4))
self.failUnless(ARRAY(c_int, 3) is ARRAY(c_int, 3))
def test_from_address(self):
# Failed with 0.9.8, reported by JUrner
p = create_string_buffer("foo")
sz = (c_char * 3).from_address(addressof(p))
self.failUnlessEqual(sz[:], "foo")
self.failUnlessEqual(sz.value, "foo")
try:
create_unicode_buffer
except NameError:
pass
else:
def test_from_addressW(self):
p = create_unicode_buffer("foo")
sz = (c_wchar * 3).from_address(addressof(p))
self.failUnlessEqual(sz[:], "foo")
self.failUnlessEqual(sz.value, "foo")
if __name__ == '__main__':
unittest.main()
import unittest
from ctypes import *
import _ctypes_test
dll = CDLL(_ctypes_test.__file__)
try:
CALLBACK_FUNCTYPE = WINFUNCTYPE
except NameError:
# fake to enable this test on Linux
CALLBACK_FUNCTYPE = CFUNCTYPE
class POINT(Structure):
_fields_ = [("x", c_int), ("y", c_int)]
class BasicWrapTestCase(unittest.TestCase):
def wrap(self, param):
return param
def test_wchar_parm(self):
try:
c_wchar
except NameError:
return
f = dll._testfunc_i_bhilfd
f.argtypes = [c_byte, c_wchar, c_int, c_long, c_float, c_double]
result = f(self.wrap(1), self.wrap(u"x"), self.wrap(3), self.wrap(4), self.wrap(5.0), self.wrap(6.0))
self.failUnlessEqual(result, 139)
self.failUnless(type(result), int)
def test_pointers(self):
f = dll._testfunc_p_p
f.restype = POINTER(c_int)
f.argtypes = [POINTER(c_int)]
# This only works if the value c_int(42) passed to the
# function is still alive while the pointer (the result) is
# used.
v = c_int(42)
self.failUnlessEqual(pointer(v).contents.value, 42)
result = f(self.wrap(pointer(v)))
self.failUnlessEqual(type(result), POINTER(c_int))
self.failUnlessEqual(result.contents.value, 42)
# This on works...
result = f(self.wrap(pointer(v)))
self.failUnlessEqual(result.contents.value, v.value)
p = pointer(c_int(99))
result = f(self.wrap(p))
self.failUnlessEqual(result.contents.value, 99)
def test_shorts(self):
f = dll._testfunc_callback_i_if
args = []
expected = [262144, 131072, 65536, 32768, 16384, 8192, 4096, 2048,
1024, 512, 256, 128, 64, 32, 16, 8, 4, 2, 1]
def callback(v):
args.append(v)
return v
CallBack = CFUNCTYPE(c_int, c_int)
cb = CallBack(callback)
f(self.wrap(2**18), self.wrap(cb))
self.failUnlessEqual(args, expected)
################################################################
def test_callbacks(self):
f = dll._testfunc_callback_i_if
f.restype = c_int
MyCallback = CFUNCTYPE(c_int, c_int)
def callback(value):
#print "called back with", value
return value
cb = MyCallback(callback)
result = f(self.wrap(-10), self.wrap(cb))
self.failUnlessEqual(result, -18)
# test with prototype
f.argtypes = [c_int, MyCallback]
cb = MyCallback(callback)
result = f(self.wrap(-10), self.wrap(cb))
self.failUnlessEqual(result, -18)
result = f(self.wrap(-10), self.wrap(cb))
self.failUnlessEqual(result, -18)
AnotherCallback = CALLBACK_FUNCTYPE(c_int, c_int, c_int, c_int, c_int)
# check that the prototype works: we call f with wrong
# argument types
cb = AnotherCallback(callback)
self.assertRaises(ArgumentError, f, self.wrap(-10), self.wrap(cb))
def test_callbacks_2(self):
# Can also use simple datatypes as argument type specifiers
# for the callback function.
# In this case the call receives an instance of that type
f = dll._testfunc_callback_i_if
f.restype = c_int
MyCallback = CFUNCTYPE(c_int, c_int)
f.argtypes = [c_int, MyCallback]
def callback(value):
#print "called back with", value
self.failUnlessEqual(type(value), int)
return value
cb = MyCallback(callback)
result = f(self.wrap(-10), self.wrap(cb))
self.failUnlessEqual(result, -18)
def test_longlong_callbacks(self):
f = dll._testfunc_callback_q_qf
f.restype = c_longlong
MyCallback = CFUNCTYPE(c_longlong, c_longlong)
f.argtypes = [c_longlong, MyCallback]
def callback(value):
self.failUnless(isinstance(value, (int, long)))
return value & 0x7FFFFFFF
cb = MyCallback(callback)
self.failUnlessEqual(13577625587, int(f(self.wrap(1000000000000), self.wrap(cb))))
def test_byval(self):
# without prototype
ptin = POINT(1, 2)
ptout = POINT()
# EXPORT int _testfunc_byval(point in, point *pout)
result = dll._testfunc_byval(ptin, byref(ptout))
got = result, ptout.x, ptout.y
expected = 3, 1, 2
self.failUnlessEqual(got, expected)
# with prototype
ptin = POINT(101, 102)
ptout = POINT()
dll._testfunc_byval.argtypes = (POINT, POINTER(POINT))
dll._testfunc_byval.restype = c_int
result = dll._testfunc_byval(self.wrap(ptin), byref(ptout))
got = result, ptout.x, ptout.y
expected = 203, 101, 102
self.failUnlessEqual(got, expected)
def test_struct_return_2H(self):
class S2H(Structure):
_fields_ = [("x", c_short),
("y", c_short)]
dll.ret_2h_func.restype = S2H
dll.ret_2h_func.argtypes = [S2H]
inp = S2H(99, 88)
s2h = dll.ret_2h_func(self.wrap(inp))
self.failUnlessEqual((s2h.x, s2h.y), (99*2, 88*3))
def test_struct_return_8H(self):
class S8I(Structure):
_fields_ = [("a", c_int),
("b", c_int),
("c", c_int),
("d", c_int),
("e", c_int),
("f", c_int),
("g", c_int),
("h", c_int)]
dll.ret_8i_func.restype = S8I
dll.ret_8i_func.argtypes = [S8I]
inp = S8I(9, 8, 7, 6, 5, 4, 3, 2)
s8i = dll.ret_8i_func(self.wrap(inp))
self.failUnlessEqual((s8i.a, s8i.b, s8i.c, s8i.d, s8i.e, s8i.f, s8i.g, s8i.h),
(9*2, 8*3, 7*4, 6*5, 5*6, 4*7, 3*8, 2*9))
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
class AsParamWrapper(object):
def __init__(self, param):
self._as_parameter_ = param
class AsParamWrapperTestCase(BasicWrapTestCase):
wrap = AsParamWrapper
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
class AsParamPropertyWrapper(object):
def __init__(self, param):
self._param = param
def getParameter(self):
return self._param
_as_parameter_ = property(getParameter)
class AsParamPropertyWrapperTestCase(BasicWrapTestCase):
wrap = AsParamPropertyWrapper
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
if __name__ == '__main__':
unittest.main()
from ctypes import *
import unittest
import os
import ctypes
import _ctypes_test
class BITS(Structure):
_fields_ = [("A", c_int, 1),
("B", c_int, 2),
("C", c_int, 3),
("D", c_int, 4),
("E", c_int, 5),
("F", c_int, 6),
("G", c_int, 7),
("H", c_int, 8),
("I", c_int, 9),
("M", c_short, 1),
("N", c_short, 2),
("O", c_short, 3),
("P", c_short, 4),
("Q", c_short, 5),
("R", c_short, 6),
("S", c_short, 7)]
func = CDLL(_ctypes_test.__file__).unpack_bitfields
func.argtypes = POINTER(BITS), c_char
##for n in "ABCDEFGHIMNOPQRS":
## print n, hex(getattr(BITS, n).size), getattr(BITS, n).offset
class C_Test(unittest.TestCase):
def test_ints(self):
for i in range(512):
for name in "ABCDEFGHI":
b = BITS()
setattr(b, name, i)
self.failUnlessEqual((name, i, getattr(b, name)), (name, i, func(byref(b), name)))
def test_shorts(self):
for i in range(256):
for name in "MNOPQRS":
b = BITS()
setattr(b, name, i)
self.failUnlessEqual((name, i, getattr(b, name)), (name, i, func(byref(b), name)))
signed_int_types = (c_byte, c_short, c_int, c_long, c_longlong)
unsigned_int_types = (c_ubyte, c_ushort, c_uint, c_ulong, c_ulonglong)
int_types = unsigned_int_types + signed_int_types
class BitFieldTest(unittest.TestCase):
def test_longlong(self):
class X(Structure):
_fields_ = [("a", c_longlong, 1),
("b", c_longlong, 62),
("c", c_longlong, 1)]
self.failUnlessEqual(sizeof(X), sizeof(c_longlong))
x = X()
x.a, x.b, x.c = -1, 7, -1
self.failUnlessEqual((x.a, x.b, x.c), (-1, 7, -1))
def test_ulonglong(self):
class X(Structure):
_fields_ = [("a", c_ulonglong, 1),
("b", c_ulonglong, 62),
("c", c_ulonglong, 1)]
self.failUnlessEqual(sizeof(X), sizeof(c_longlong))
x = X()
self.failUnlessEqual((x.a, x.b, x.c), (0, 0, 0))
x.a, x.b, x.c = 7, 7, 7
self.failUnlessEqual((x.a, x.b, x.c), (1, 7, 1))
def test_signed(self):
for c_typ in signed_int_types:
class X(Structure):
_fields_ = [("dummy", c_typ),
("a", c_typ, 3),
("b", c_typ, 3),
("c", c_typ, 1)]
self.failUnlessEqual(sizeof(X), sizeof(c_typ)*2)
x = X()
self.failUnlessEqual((c_typ, x.a, x.b, x.c), (c_typ, 0, 0, 0))
x.a = -1
self.failUnlessEqual((c_typ, x.a, x.b, x.c), (c_typ, -1, 0, 0))
x.a, x.b = 0, -1
self.failUnlessEqual((c_typ, x.a, x.b, x.c), (c_typ, 0, -1, 0))
def test_unsigned(self):
for c_typ in unsigned_int_types:
class X(Structure):
_fields_ = [("a", c_typ, 3),
("b", c_typ, 3),
("c", c_typ, 1)]
self.failUnlessEqual(sizeof(X), sizeof(c_typ))
x = X()
self.failUnlessEqual((c_typ, x.a, x.b, x.c), (c_typ, 0, 0, 0))
x.a = -1
self.failUnlessEqual((c_typ, x.a, x.b, x.c), (c_typ, 7, 0, 0))
x.a, x.b = 0, -1
self.failUnlessEqual((c_typ, x.a, x.b, x.c), (c_typ, 0, 7, 0))
def fail_fields(self, *fields):
return self.get_except(type(Structure), "X", (),
{"_fields_": fields})
def test_nonint_types(self):
# bit fields are not allowed on non-integer types.
result = self.fail_fields(("a", c_char_p, 1))
self.failUnlessEqual(result, (TypeError, 'bit fields not allowed for type c_char_p'))
result = self.fail_fields(("a", c_void_p, 1))
self.failUnlessEqual(result, (TypeError, 'bit fields not allowed for type c_void_p'))
if c_int != c_long:
result = self.fail_fields(("a", POINTER(c_int), 1))
self.failUnlessEqual(result, (TypeError, 'bit fields not allowed for type LP_c_int'))
result = self.fail_fields(("a", c_char, 1))
self.failUnlessEqual(result, (TypeError, 'bit fields not allowed for type c_char'))
try:
c_wchar
except NameError:
pass
else:
result = self.fail_fields(("a", c_wchar, 1))
self.failUnlessEqual(result, (TypeError, 'bit fields not allowed for type c_wchar'))
class Dummy(Structure):
_fields_ = []
result = self.fail_fields(("a", Dummy, 1))
self.failUnlessEqual(result, (TypeError, 'bit fields not allowed for type Dummy'))
def test_single_bitfield_size(self):
for c_typ in int_types:
result = self.fail_fields(("a", c_typ, -1))
self.failUnlessEqual(result, (ValueError, 'number of bits invalid for bit field'))
result = self.fail_fields(("a", c_typ, 0))
self.failUnlessEqual(result, (ValueError, 'number of bits invalid for bit field'))
class X(Structure):
_fields_ = [("a", c_typ, 1)]
self.failUnlessEqual(sizeof(X), sizeof(c_typ))
class X(Structure):
_fields_ = [("a", c_typ, sizeof(c_typ)*8)]
self.failUnlessEqual(sizeof(X), sizeof(c_typ))
result = self.fail_fields(("a", c_typ, sizeof(c_typ)*8 + 1))
self.failUnlessEqual(result, (ValueError, 'number of bits invalid for bit field'))
def test_multi_bitfields_size(self):
class X(Structure):
_fields_ = [("a", c_short, 1),
("b", c_short, 14),
("c", c_short, 1)]
self.failUnlessEqual(sizeof(X), sizeof(c_short))
class X(Structure):
_fields_ = [("a", c_short, 1),
("a1", c_short),
("b", c_short, 14),
("c", c_short, 1)]
self.failUnlessEqual(sizeof(X), sizeof(c_short)*3)
self.failUnlessEqual(X.a.offset, 0)
self.failUnlessEqual(X.a1.offset, sizeof(c_short))
self.failUnlessEqual(X.b.offset, sizeof(c_short)*2)
self.failUnlessEqual(X.c.offset, sizeof(c_short)*2)
class X(Structure):
_fields_ = [("a", c_short, 3),
("b", c_short, 14),
("c", c_short, 14)]
self.failUnlessEqual(sizeof(X), sizeof(c_short)*3)
self.failUnlessEqual(X.a.offset, sizeof(c_short)*0)
self.failUnlessEqual(X.b.offset, sizeof(c_short)*1)
self.failUnlessEqual(X.c.offset, sizeof(c_short)*2)
def get_except(self, func, *args, **kw):
try:
func(*args, **kw)
except Exception, detail:
return detail.__class__, str(detail)
def test_mixed_1(self):
class X(Structure):
_fields_ = [("a", c_byte, 4),
("b", c_int, 4)]
if os.name in ("nt", "ce"):
self.failUnlessEqual(sizeof(X), sizeof(c_int)*2)
else:
self.failUnlessEqual(sizeof(X), sizeof(c_int))
def test_mixed_2(self):
class X(Structure):
_fields_ = [("a", c_byte, 4),
("b", c_int, 32)]
self.failUnlessEqual(sizeof(X), sizeof(c_int)*2)
def test_mixed_3(self):
class X(Structure):
_fields_ = [("a", c_byte, 4),
("b", c_ubyte, 4)]
self.failUnlessEqual(sizeof(X), sizeof(c_byte))
def test_anon_bitfields(self):
# anonymous bit-fields gave a strange error message
class X(Structure):
_fields_ = [("a", c_byte, 4),
("b", c_ubyte, 4)]
class Y(Structure):
_anonymous_ = ["_"]
_fields_ = [("_", X)]
if __name__ == "__main__":
unittest.main()
from ctypes import *
import unittest
class StringBufferTestCase(unittest.TestCase):
def test_buffer(self):
b = create_string_buffer(32)
self.failUnlessEqual(len(b), 32)
self.failUnlessEqual(sizeof(b), 32 * sizeof(c_char))
self.failUnless(type(b[0]) is str)
b = create_string_buffer("abc")
self.failUnlessEqual(len(b), 4) # trailing nul char
self.failUnlessEqual(sizeof(b), 4 * sizeof(c_char))
self.failUnless(type(b[0]) is str)
self.failUnlessEqual(b[0], "a")
self.failUnlessEqual(b[:], "abc\0")
def test_string_conversion(self):
b = create_string_buffer(u"abc")
self.failUnlessEqual(len(b), 4) # trailing nul char
self.failUnlessEqual(sizeof(b), 4 * sizeof(c_char))
self.failUnless(type(b[0]) is str)
self.failUnlessEqual(b[0], "a")
self.failUnlessEqual(b[:], "abc\0")
try:
c_wchar
except NameError:
pass
else:
def test_unicode_buffer(self):
b = create_unicode_buffer(32)
self.failUnlessEqual(len(b), 32)
self.failUnlessEqual(sizeof(b), 32 * sizeof(c_wchar))
self.failUnless(type(b[0]) is unicode)
b = create_unicode_buffer(u"abc")
self.failUnlessEqual(len(b), 4) # trailing nul char
self.failUnlessEqual(sizeof(b), 4 * sizeof(c_wchar))
self.failUnless(type(b[0]) is unicode)
self.failUnlessEqual(b[0], u"a")
self.failUnlessEqual(b[:], "abc\0")
def test_unicode_conversion(self):
b = create_unicode_buffer("abc")
self.failUnlessEqual(len(b), 4) # trailing nul char
self.failUnlessEqual(sizeof(b), 4 * sizeof(c_wchar))
self.failUnless(type(b[0]) is unicode)
self.failUnlessEqual(b[0], u"a")
self.failUnlessEqual(b[:], "abc\0")
if __name__ == "__main__":
unittest.main()
This diff is collapsed.
import unittest
from ctypes import *
import _ctypes_test
class Callbacks(unittest.TestCase):
functype = CFUNCTYPE
## def tearDown(self):
## import gc
## gc.collect()
def callback(self, *args):
self.got_args = args
return args[-1]
def check_type(self, typ, arg):
PROTO = self.functype.im_func(typ, typ)
result = PROTO(self.callback)(arg)
if typ == c_float:
self.failUnlessAlmostEqual(result, arg, places=5)
else:
self.failUnlessEqual(self.got_args, (arg,))
self.failUnlessEqual(result, arg)
PROTO = self.functype.im_func(typ, c_byte, typ)
result = PROTO(self.callback)(-3, arg)
if typ == c_float:
self.failUnlessAlmostEqual(result, arg, places=5)
else:
self.failUnlessEqual(self.got_args, (-3, arg))
self.failUnlessEqual(result, arg)
################
def test_byte(self):
self.check_type(c_byte, 42)
self.check_type(c_byte, -42)
def test_ubyte(self):
self.check_type(c_ubyte, 42)
def test_short(self):
self.check_type(c_short, 42)
self.check_type(c_short, -42)
def test_ushort(self):
self.check_type(c_ushort, 42)
def test_int(self):
self.check_type(c_int, 42)
self.check_type(c_int, -42)
def test_uint(self):
self.check_type(c_uint, 42)
def test_long(self):
self.check_type(c_long, 42)
self.check_type(c_long, -42)
def test_ulong(self):
self.check_type(c_ulong, 42)
def test_longlong(self):
self.check_type(c_longlong, 42)
self.check_type(c_longlong, -42)
def test_ulonglong(self):
self.check_type(c_ulonglong, 42)
def test_float(self):
# only almost equal: double -> float -> double
import math
self.check_type(c_float, math.e)
self.check_type(c_float, -math.e)
def test_double(self):
self.check_type(c_double, 3.14)
self.check_type(c_double, -3.14)
def test_char(self):
self.check_type(c_char, "x")
self.check_type(c_char, "a")
# disabled: would now (correctly) raise a RuntimeWarning about
# a memory leak. A callback function cannot return a non-integral
# C type without causing a memory leak.
## def test_char_p(self):
## self.check_type(c_char_p, "abc")
## self.check_type(c_char_p, "def")
def test_pyobject(self):
o = ()
from sys import getrefcount as grc
for o in (), [], object():
initial = grc(o)
# This call leaks a reference to 'o'...
self.check_type(py_object, o)
before = grc(o)
# ...but this call doesn't leak any more. Where is the refcount?
self.check_type(py_object, o)
after = grc(o)
self.failUnlessEqual((after, o), (before, o))
try:
WINFUNCTYPE
except NameError:
pass
else:
class StdcallCallbacks(Callbacks):
functype = WINFUNCTYPE
################################################################
class SampleCallbacksTestCase(unittest.TestCase):
def test_integrate(self):
# Derived from some then non-working code, posted by David Foster
dll = CDLL(_ctypes_test.__file__)
# The function prototype called by 'integrate': double func(double);
CALLBACK = CFUNCTYPE(c_double, c_double)
# The integrate function itself, exposed from the _ctypes_test dll
integrate = dll.integrate
integrate.argtypes = (c_double, c_double, CALLBACK, c_long)
integrate.restype = c_double
def func(x):
return x**2
result = integrate(0.0, 1.0, CALLBACK(func), 10)
diff = abs(result - 1./3.)
self.failUnless(diff < 0.01, "%s not less than 0.01" % diff)
################################################################
if __name__ == '__main__':
unittest.main()
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
import sys
from ctypes import *
##class HMODULE(Structure):
## _fields_ = [("value", c_void_p)]
## def __repr__(self):
## return "<HMODULE %s>" % self.value
##windll.kernel32.GetModuleHandleA.restype = HMODULE
##print windll.kernel32.GetModuleHandleA("python23.dll")
##print hex(sys.dllhandle)
##def nonzero(handle):
## return (GetLastError(), handle)
##windll.kernel32.GetModuleHandleA.errcheck = nonzero
##print windll.kernel32.GetModuleHandleA("spam")
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment