Code: Select all
pyb_i2s_obj_t *self = pos_args[0];
mp_arg_val_t args[MP_ARRAY_SIZE(allowed_args)];
mp_arg_parse_all(n_args - 1, pos_args + 1, kw_args, MP_ARRAY_SIZE(allowed_args), allowed_args, args);
mp_obj_t stream = args[0].u_obj;
mp_obj_type_t *type = mp_obj_get_type(stream);
if (type->stream_p == NULL || type->stream_p->read == NULL) {
nlr_raise(mp_obj_new_exception_msg_varg(&mp_type_ValueError, "Object type %s not a readable stream",
mp_obj_get_type_str(stream)));
}
...
self->out_sz = self->dstream_tx->type->stream_p->read(self->dstream_tx, &self->audiobuf_tx[self->pp_ptr], buf_sz, &error);
if (self->out_sz == MP_STREAM_ERROR) {
if (mp_is_nonblocking_error(error)) {
// nonblocking error behavior copied from py/stream.c: stream_read()
// see https://docs.python.org/3.4/library/io.html#io.RawIOBase.read
return mp_const_none;
}
nlr_raise(mp_obj_new_exception_arg1(&mp_type_OSError, MP_OBJ_NEW_SMALL_INT(error)));
}
Code: Select all
mp_obj_t mp_stream_write(mp_obj_t self_in, const void *buf, mp_uint_t len) {
struct _mp_obj_base_t *o = (struct _mp_obj_base_t *)self_in;
if (o->type->stream_p == NULL || o->type->stream_p->write == NULL) {
// CPython: io.UnsupportedOperation, OSError subclass
nlr_raise(mp_obj_new_exception_msg(&mp_type_OSError, "Operation not supported"));
}
int error;
mp_uint_t out_sz = o->type->stream_p->write(self_in, buf, len, &error);
if (out_sz == MP_STREAM_ERROR) {
if (mp_is_nonblocking_error(error)) {
// http://docs.python.org/3/library/io.html#io.RawIOBase.write
// "None is returned if the raw stream is set not to block and
// no single byte could be readily written to it."
// This is for consistency with read() behavior, still weird,
// see abobe.
return mp_const_none;
}
nlr_raise(mp_obj_new_exception_arg1(&mp_type_OSError, MP_OBJ_NEW_SMALL_INT(error)));
} else {
return MP_OBJ_NEW_SMALL_INT(out_sz);
}
}
Would it be a problem to introduce a public mp_stream_read() function, or should I be using mp_stream_read_obj instead? If the latter, any tips about how to implement that?
Thanks!
Bryan